1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-06-18 22:17:44 +02:00

feat(logger): add logger option to all micro components (override DefaultLogger) closes #2556 (#2559)

* feat(logger): add logger option to all components

* fix: refactor api/rpc.go

* fix: refactor api/stream.go

* fix: api/options.go comment

* fix(logger): do not use logger.Helper internally

* fix(logger): fix comments

* fix(logger): use level.Enabled method

* fix: rename mlogger to log

* fix: run go fmt

* fix: log level

* fix: factories

Co-authored-by: Mohamed MHAMDI <mmhamdi@hubside.com>
Co-authored-by: Davincible <david.brouwer.99@gmail.com>
This commit is contained in:
Mohamed MHAMDI
2022-09-29 16:44:53 +02:00
committed by GitHub
parent 57a0ef5a0f
commit 1db36357d5
63 changed files with 818 additions and 673 deletions

View File

@ -3,6 +3,7 @@ package handler
import (
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/client"
"go-micro.dev/v4/logger"
)
var (
@ -14,13 +15,17 @@ type Options struct {
Namespace string
Router router.Router
Client client.Client
Logger logger.Logger
}
type Option func(o *Options)
// NewOptions fills in the blanks
func NewOptions(opts ...Option) Options {
var options Options
options := Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
@ -33,6 +38,10 @@ func NewOptions(opts ...Option) Options {
options.MaxRecvSize = DefaultMaxRecvSize
}
if options.Logger == nil {
options.Logger = logger.LoggerOrDefault(options.Logger)
}
return options
}
@ -56,9 +65,16 @@ func WithClient(c client.Client) Option {
}
}
// WithmaxRecvSize specifies max body size
// WithMaxRecvSize specifies max body size
func WithMaxRecvSize(size int64) Option {
return func(o *Options) {
o.MaxRecvSize = size
}
}
// WithLogger specifies the logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -11,6 +11,7 @@ import (
jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/oxtoacart/bpool"
"go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/internal/proto"
"go-micro.dev/v4/api/router"
@ -19,7 +20,7 @@ import (
"go-micro.dev/v4/codec/jsonrpc"
"go-micro.dev/v4/codec/protorpc"
"go-micro.dev/v4/errors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/metadata"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/selector"
@ -29,6 +30,7 @@ import (
const (
Handler = "rpc"
packageID = "go.micro.api"
)
var (
@ -73,6 +75,7 @@ func strategy(services []*registry.Service) selector.Strategy {
}
func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := h.opts.Logger
bsize := handler.DefaultMaxRecvSize
if h.opts.MaxRecvSize > 0 {
bsize = h.opts.MaxRecvSize
@ -87,13 +90,19 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// try get service from router
s, err := h.opts.Router.Route(r)
if err != nil {
writeError(w, r, errors.InternalServerError("go.micro.api", err.Error()))
werr := writeError(w, r, errors.InternalServerError(packageID, err.Error()))
if werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
service = s
} else {
// we have no way of routing the request
writeError(w, r, errors.InternalServerError("go.micro.api", "no route found"))
werr := writeError(w, r, errors.InternalServerError(packageID, "no route found"))
if werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
@ -133,7 +142,9 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// drop older context as it can have timeouts and create new
// md, _ := metadata.FromContext(cx)
//serveWebsocket(context.TODO(), w, r, service, c)
serveWebsocket(cx, w, r, service, c)
if err := serveWebsocket(cx, w, r, service, c); err != nil {
logger.Log(log.ErrorLevel, err)
}
return
}
@ -144,7 +155,9 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// get payload
br, err := requestPayload(r)
if err != nil {
writeError(w, r, err)
if werr := writeError(w, r, err); werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
@ -171,14 +184,18 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// make the call
if err := c.Call(cx, req, response, client.WithSelectOption(so)); err != nil {
writeError(w, r, err)
if werr := writeError(w, r, err); werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
// marshall response
rsp, err = response.Marshal()
if err != nil {
writeError(w, r, err)
if werr := writeError(w, r, err); werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
@ -206,20 +223,26 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
)
// make the call
if err := c.Call(cx, req, &response, client.WithSelectOption(so)); err != nil {
writeError(w, r, err)
if werr := writeError(w, r, err); werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
// marshall response
rsp, err = response.MarshalJSON()
if err != nil {
writeError(w, r, err)
if werr := writeError(w, r, err); werr != nil {
logger.Log(log.ErrorLevel, werr)
}
return
}
}
// write the response
writeResponse(w, r, rsp)
if err := writeResponse(w, r, rsp); err != nil {
logger.Log(log.ErrorLevel, err)
}
}
func (rh *rpcHandler) String() string {
@ -275,7 +298,9 @@ func requestPayload(r *http.Request) ([]byte, error) {
}
return raw.Marshal()
case strings.Contains(ct, "application/www-x-form-urlencoded"):
r.ParseForm()
if err := r.ParseForm(); err != nil {
return nil, err
}
// generate a new set of values from the form
vals := make(map[string]string)
@ -441,14 +466,14 @@ func requestPayload(r *http.Request) ([]byte, error) {
return []byte{}, nil
}
func writeError(w http.ResponseWriter, r *http.Request, err error) {
func writeError(w http.ResponseWriter, r *http.Request, err error) error {
ce := errors.Parse(err.Error())
switch ce.Code {
case 0:
// assuming it's totally screwed
ce.Code = 500
ce.Id = "go.micro.api"
ce.Id = packageID
ce.Status = http.StatusText(500)
ce.Detail = "error during request: " + ce.Detail
w.WriteHeader(500)
@ -468,14 +493,10 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) {
}
_, werr := w.Write([]byte(ce.Error()))
if werr != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(werr)
}
}
return werr
}
func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) error {
w.Header().Set("Content-Type", r.Header.Get("Content-Type"))
w.Header().Set("Content-Length", strconv.Itoa(len(rsp)))
@ -494,12 +515,7 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
// write response
_, err := w.Write(rsp)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
}
return err
}
func NewHandler(opts ...handler.Option) handler.Handler {

View File

@ -12,15 +12,15 @@ import (
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/client"
raw "go-micro.dev/v4/codec/bytes"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/selector"
)
// serveWebsocket will stream rpc back over websockets assuming json
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *router.Route, c client.Client) {
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *router.Route, c client.Client) (err error) {
var op ws.OpCode
ct := r.Header.Get("Content-Type")
@ -49,9 +49,6 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
}
payload, err := requestPayload(r)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
}
@ -72,18 +69,12 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
conn, rw, _, err := upgrader.Upgrade(r, w)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
}
defer func() {
if err := conn.Close(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
if cErr := conn.Close(); cErr != nil && err == nil {
err = cErr
}
}()
@ -114,22 +105,20 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
// create a new stream
stream, err := c.Stream(ctx, req, client.WithSelectOption(so))
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
}
if request != nil {
if err = stream.Send(request); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
}
}
go writeLoop(rw, stream)
go func() {
if wErr := writeLoop(rw, stream); wErr != nil && err == nil {
err = wErr
}
}()
rsp := stream.Response()
@ -137,49 +126,40 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
for {
select {
case <-ctx.Done():
return
return nil
case <-stream.Context().Done():
return
return nil
default:
// read backend response body
buf, err := rsp.Read()
if err != nil {
// wants to avoid import grpc/status.Status
if strings.Contains(err.Error(), "context canceled") {
return
return nil
}
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
return err
}
// write the response
if err := wsutil.WriteServerMessage(rw, op, buf); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
if err = wsutil.WriteServerMessage(rw, op, buf); err != nil {
return err
}
if err = rw.Flush(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
return err
}
}
}
}
// writeLoop
func writeLoop(rw io.ReadWriter, stream client.Stream) {
func writeLoop(rw io.ReadWriter, stream client.Stream) error {
// close stream when done
defer stream.Close()
for {
select {
case <-stream.Context().Done():
return
return nil
default:
buf, op, err := wsutil.ReadClientData(rw)
if err != nil {
@ -187,16 +167,13 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
switch wserr.Code {
case ws.StatusGoingAway:
// this happens when user leave the page
return
return nil
case ws.StatusNormalClosure, ws.StatusNoStatusRcvd:
// this happens when user close ws connection, or we don't get any status
return
return nil
}
}
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
return err
}
switch op {
default:
@ -210,10 +187,7 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
// if the extracted payload isn't empty lets use it
request := &raw.Frame{Data: buf}
if err := stream.Send(request); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
return
return err
}
}
}

View File

@ -3,6 +3,7 @@ package router
import (
"go-micro.dev/v4/api/resolver"
"go-micro.dev/v4/api/resolver/vpath"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
)
@ -10,6 +11,7 @@ type Options struct {
Handler string
Registry registry.Registry
Resolver resolver.Resolver
Logger logger.Logger
}
type Option func(o *Options)
@ -18,6 +20,7 @@ func NewOptions(opts ...Option) Options {
options := Options{
Handler: "meta",
Registry: registry.DefaultRegistry,
Logger: logger.DefaultLogger,
}
for _, o := range opts {
@ -50,3 +53,10 @@ func WithResolver(r resolver.Resolver) Option {
o.Resolver = r
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -12,7 +12,7 @@ import (
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/api/router/util"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/metadata"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/registry/cache"
@ -51,14 +51,13 @@ func (r *registryRouter) isStopped() bool {
// refresh list of api services
func (r *registryRouter) refresh() {
var attempts int
logger := r.Options().Logger
for {
services, err := r.opts.Registry.ListServices()
if err != nil {
attempts++
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("unable to list services: %v", err)
}
logger.Logf(log.ErrorLevel, "unable to list services: %v", err)
time.Sleep(time.Duration(attempts) * time.Second)
continue
}
@ -69,9 +68,7 @@ func (r *registryRouter) refresh() {
for _, s := range services {
service, err := r.rc.GetService(s.Name)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("unable to get service: %v", err)
}
logger.Logf(log.ErrorLevel, "unable to get service: %v", err)
continue
}
r.store(service)
@ -89,6 +86,7 @@ func (r *registryRouter) refresh() {
// process watch event
func (r *registryRouter) process(res *registry.Result) {
logger := r.Options().Logger
// skip these things
if res == nil || res.Service == nil {
return
@ -97,9 +95,7 @@ func (r *registryRouter) process(res *registry.Result) {
// get entry from cache
service, err := r.rc.GetService(res.Service.Name)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("unable to get service: %v", err)
}
logger.Logf(log.ErrorLevel, "unable to get service: %v", err)
return
}
@ -109,6 +105,7 @@ func (r *registryRouter) process(res *registry.Result) {
// store local endpoint cache
func (r *registryRouter) store(services []*registry.Service) {
logger := r.Options().Logger
// endpoints
eps := map[string]*router.Route{}
@ -129,9 +126,7 @@ func (r *registryRouter) store(services []*registry.Service) {
// if we got nothing skip
if err := router.Validate(end); err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("endpoint validation failed: %v", err)
}
logger.Logf(log.TraceLevel, "endpoint validation failed: %v", err)
continue
}
@ -176,9 +171,7 @@ func (r *registryRouter) store(services []*registry.Service) {
}
hostreg, err := regexp.CompilePOSIX(h)
if err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("endpoint have invalid host regexp: %v", err)
}
logger.Logf(log.TraceLevel, "endpoint have invalid host regexp: %v", err)
continue
}
cep.hostregs = append(cep.hostregs, hostreg)
@ -197,20 +190,16 @@ func (r *registryRouter) store(services []*registry.Service) {
rule, err := util.Parse(p)
if err != nil && !pcreok {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("endpoint have invalid path pattern: %v", err)
}
logger.Logf(log.TraceLevel, "endpoint have invalid path pattern: %v", err)
continue
} else if err != nil && pcreok {
continue
}
tpl := rule.Compile()
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "", util.PatternLogger(logger))
if err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("endpoint have invalid path pattern: %v", err)
}
logger.Logf(log.TraceLevel, "endpoint have invalid path pattern: %v", err)
continue
}
cep.pathregs = append(cep.pathregs, pathreg)
@ -223,6 +212,7 @@ func (r *registryRouter) store(services []*registry.Service) {
// watch for endpoint changes
func (r *registryRouter) watch() {
var attempts int
logger := r.Options().Logger
for {
if r.isStopped() {
@ -233,9 +223,7 @@ func (r *registryRouter) watch() {
w, err := r.opts.Registry.Watch()
if err != nil {
attempts++
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("error watching endpoints: %v", err)
}
logger.Logf(log.ErrorLevel, "error watching endpoints: %v", err)
time.Sleep(time.Duration(attempts) * time.Second)
continue
}
@ -258,9 +246,7 @@ func (r *registryRouter) watch() {
// process next event
res, err := w.Next()
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("error getting next endoint: %v", err)
}
logger.Logf(log.ErrorLevel, "error getting next endoint: %v", err)
close(ch)
break
}
@ -293,6 +279,7 @@ func (r *registryRouter) Deregister(ep *router.Route) error {
}
func (r *registryRouter) Endpoint(req *http.Request) (*router.Route, error) {
logger := r.Options().Logger
if r.isStopped() {
return nil, errors.New("router closed")
}
@ -325,9 +312,8 @@ func (r *registryRouter) Endpoint(req *http.Request) (*router.Route, error) {
if !mMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api method match %s", req.Method)
}
logger.Logf(log.DebugLevel, "api method match %s", req.Method)
// 2. try host
if len(ep.Host) == 0 {
@ -348,22 +334,17 @@ func (r *registryRouter) Endpoint(req *http.Request) (*router.Route, error) {
if !hMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api host match %s", req.URL.Host)
}
logger.Logf(log.DebugLevel, "api host match %s", req.URL.Host)
// 3. try path via google.api path matching
for _, pathreg := range cep.pathregs {
matches, err := pathreg.Match(path, "")
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath not match %s != %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api gpath not match %s != %v", path, pathreg)
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath match %s = %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api gpath match %s = %v", path, pathreg)
pMatch = true
ctx := req.Context()
md, ok := metadata.FromContext(ctx)
@ -381,14 +362,10 @@ func (r *registryRouter) Endpoint(req *http.Request) (*router.Route, error) {
// 4. try path via pcre path matching
for _, pathreg := range cep.pcreregs {
if !pathreg.MatchString(req.URL.Path) {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api pcre path not match %s != %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api pcre path not match %s != %v", path, pathreg)
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api pcre path match %s != %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api pcre path match %s != %v", path, pathreg)
pMatch = true
break
}

View File

@ -10,7 +10,7 @@ import (
"go-micro.dev/v4/api/router"
"go-micro.dev/v4/api/router/util"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/metadata"
"go-micro.dev/v4/registry"
rutil "go-micro.dev/v4/util/registry"
@ -129,7 +129,7 @@ func (r *staticRouter) Register(route *router.Route) error {
}
tpl := rule.Compile()
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "", util.PatternLogger(r.Options().Logger))
if err != nil {
return err
}
@ -222,6 +222,7 @@ func (r *staticRouter) Endpoint(req *http.Request) (*router.Route, error) {
}
func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
logger := r.Options().Logger
if r.isStopd() {
return nil, errors.New("router closed")
}
@ -250,9 +251,7 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
if !mMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api method match %s", req.Method)
}
logger.Logf(log.DebugLevel, "api method match %s", req.Method)
// 2. try host
if len(ep.apiep.Host) == 0 {
@ -273,22 +272,16 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
if !hMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api host match %s", req.URL.Host)
}
logger.Logf(log.DebugLevel, "api host match %s", req.URL.Host)
// 3. try google.api path
for _, pathreg := range ep.pathregs {
matches, err := pathreg.Match(path, "")
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath not match %s != %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api gpath not match %s != %v", path, pathreg)
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath match %s = %v", path, pathreg)
}
logger.Logf(log.DebugLevel, "api gpath match %s = %v", path, pathreg)
pMatch = true
ctx := req.Context()
md, ok := metadata.FromContext(ctx)
@ -306,9 +299,7 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) {
// 4. try path via pcre path matching
for _, pathreg := range ep.pcreregs {
if !pathreg.MatchString(req.URL.Path) {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api pcre path not match %s != %v", req.URL.Path, pathreg)
}
logger.Logf(log.DebugLevel, "api pcre path not match %s != %v", req.URL.Path, pathreg)
continue
}
pMatch = true

View File

@ -6,7 +6,7 @@ import (
"fmt"
"strings"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
)
// InvalidTemplateError indicates that the path template is not valid.
@ -98,38 +98,36 @@ func tokenize(path string) (tokens []string, verb string) {
type parser struct {
tokens []string
accepted []string
logger log.Logger
}
// topLevelSegments is the target of this parser.
func (p *parser) topLevelSegments() ([]segment, error) {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Parsing %q", p.tokens)
}
logger := log.LoggerOrDefault(p.logger)
logger.Logf(log.DebugLevel, "Parsing %q", p.tokens)
segs, err := p.segments()
if err != nil {
return nil, err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("accept segments: %q; %q", p.accepted, p.tokens)
}
logger.Logf(log.DebugLevel, "accept segments: %q; %q", p.accepted, p.tokens)
if _, err := p.accept(typeEOF); err != nil {
return nil, fmt.Errorf("unexpected token %q after segments %q", p.tokens[0], strings.Join(p.accepted, ""))
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("accept eof: %q; %q", p.accepted, p.tokens)
}
logger.Logf(log.DebugLevel, "accept eof: %q; %q", p.accepted, p.tokens)
return segs, nil
}
func (p *parser) segments() ([]segment, error) {
logger := log.LoggerOrDefault(p.logger)
s, err := p.segment()
if err != nil {
return nil, err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("accept segment: %q; %q", p.accepted, p.tokens)
}
logger.Logf(log.DebugLevel, "accept segment: %q; %q", p.accepted, p.tokens)
segs := []segment{s}
for {
if _, err := p.accept("/"); err != nil {
@ -140,9 +138,7 @@ func (p *parser) segments() ([]segment, error) {
return segs, err
}
segs = append(segs, s)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("accept segment: %q; %q", p.accepted, p.tokens)
}
logger.Logf(log.DebugLevel, "accept segment: %q; %q", p.accepted, p.tokens)
}
}
@ -270,6 +266,7 @@ func (p *parser) accept(term termType) (string, error) {
// expectPChars determines if "t" consists of only pchars defined in RFC3986.
//
// https://www.ietf.org/rfc/rfc3986.txt, P.49
//
// pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
// unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
// sub-delims = "!" / "$" / "&" / "'" / "(" / ")"

View File

@ -7,7 +7,7 @@ import (
"fmt"
"strings"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
)
var (
@ -43,11 +43,19 @@ type Pattern struct {
type patternOptions struct {
assumeColonVerb bool
logger log.Logger
}
// PatternOpt is an option for creating Patterns.
type PatternOpt func(*patternOptions)
// Logger sets the logger
func PatternLogger(l log.Logger) PatternOpt {
return func(po *patternOptions) {
po.logger = l
}
}
// NewPattern returns a new Pattern from the given definition values.
// "ops" is a sequence of op codes. "pool" is a constant pool.
// "verb" is the verb part of the pattern. It is empty if the pattern does not have the part.
@ -61,18 +69,16 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
o(&options)
}
logger := log.LoggerOrDefault(options.logger)
if version != 1 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("unsupported version: %d", version)
}
logger.Logf(log.DebugLevel, "unsupported version: %d", version)
return Pattern{}, ErrInvalidPattern
}
l := len(ops)
if l%2 != 0 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("odd number of ops codes: %d", l)
}
logger.Logf(log.DebugLevel, "odd number of ops codes: %d", l)
return Pattern{}, ErrInvalidPattern
}
@ -95,18 +101,14 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
stack++
case OpPushM:
if pushMSeen {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("pushM appears twice")
}
logger.Logf(log.DebugLevel, "pushM appears twice")
return Pattern{}, ErrInvalidPattern
}
pushMSeen = true
stack++
case OpLitPush:
if op.operand < 0 || len(pool) <= op.operand {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("negative literal index: %d", op.operand)
}
logger.Logf(log.DebugLevel, "negative literal index: %d", op.operand)
return Pattern{}, ErrInvalidPattern
}
if pushMSeen {
@ -115,24 +117,18 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
stack++
case OpConcatN:
if op.operand <= 0 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("negative concat size: %d", op.operand)
}
logger.Logf(log.DebugLevel, "negative concat size: %d", op.operand)
return Pattern{}, ErrInvalidPattern
}
stack -= op.operand
if stack < 0 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("stack underflow")
}
logger.Logf(log.DebugLevel, "stack underflow")
return Pattern{}, ErrInvalidPattern
}
stack++
case OpCapture:
if op.operand < 0 || len(pool) <= op.operand {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("variable name index out of bound: %d", op.operand)
}
logger.Logf(log.DebugLevel, "variable name index out of bound: %d", op.operand)
return Pattern{}, ErrInvalidPattern
}
v := pool[op.operand]
@ -140,15 +136,11 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
vars = append(vars, v)
stack--
if stack < 0 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("stack underflow")
}
logger.Logf(log.DebugLevel, "stack underflow")
return Pattern{}, ErrInvalidPattern
}
default:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("invalid opcode: %d", op.code)
}
logger.Logf(log.DebugLevel, "invalid opcode: %d", op.code)
return Pattern{}, ErrInvalidPattern
}
@ -171,9 +163,7 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
// MustPattern is a helper function which makes it easier to call NewPattern in variable initialization.
func MustPattern(p Pattern, err error) Pattern {
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Fatalf("Pattern initialization failed: %v", err)
}
log.Logf(log.FatalLevel, "Pattern initialization failed: %v", err)
}
return p
}

View File

@ -7,13 +7,16 @@ import (
"net"
"os"
"go-micro.dev/v4/api/server/acme"
"go-micro.dev/v4/logger"
"golang.org/x/crypto/acme/autocert"
"go-micro.dev/v4/api/server/acme"
log "go-micro.dev/v4/logger"
)
// autoCertACME is the ACME provider from golang.org/x/crypto/acme/autocert
type autocertProvider struct{}
type autocertProvider struct {
logger log.Logger
}
// Listen implements acme.Provider
func (a *autocertProvider) Listen(hosts ...string) (net.Listener, error) {
@ -22,6 +25,7 @@ func (a *autocertProvider) Listen(hosts ...string) (net.Listener, error) {
// TLSConfig returns a new tls config
func (a *autocertProvider) TLSConfig(hosts ...string) (*tls.Config, error) {
logger := log.LoggerOrDefault(a.logger)
// create a new manager
m := &autocert.Manager{
Prompt: autocert.AcceptTOS,
@ -31,9 +35,7 @@ func (a *autocertProvider) TLSConfig(hosts ...string) (*tls.Config, error) {
}
dir := cacheDir()
if err := os.MkdirAll(dir, 0700); err != nil {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("warning: autocert not using a cache: %v", err)
}
logger.Logf(log.InfoLevel, "warning: autocert not using a cache: %v", err)
} else {
m.Cache = autocert.DirCache(dir)
}

View File

@ -1,6 +1,10 @@
package acme
import "github.com/go-acme/lego/v4/challenge"
import (
"github.com/go-acme/lego/v4/challenge"
"go-micro.dev/v4/logger"
)
// Option (or Options) are passed to New() to configure providers
type Option func(o *Options)
@ -22,6 +26,9 @@ type Options struct {
// there's no defined interface, so if you consume this option
// sanity check it before using.
Cache interface{}
// Logger is the underling logging framework
Logger logger.Logger
}
// AcceptToS indicates whether you accept your CA's terms of service
@ -63,6 +70,13 @@ func Cache(c interface{}) Option {
}
}
// Logger sets the underline logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// DefaultOptions uses the Let's Encrypt Production CA, with DNS Challenge disabled.
func DefaultOptions() Options {
return Options{

View File

@ -9,9 +9,10 @@ import (
"sync"
"github.com/gorilla/handlers"
"go-micro.dev/v4/api/server"
"go-micro.dev/v4/api/server/cors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
)
type httpServer struct {
@ -24,10 +25,7 @@ type httpServer struct {
}
func NewServer(address string, opts ...server.Option) server.Server {
var options server.Options
for _, o := range opts {
o(&options)
}
options := server.NewOptions(opts...)
return &httpServer{
opts: options,
@ -70,6 +68,7 @@ func (s *httpServer) Handle(path string, handler http.Handler) {
}
func (s *httpServer) Start() error {
logger := s.opts.Logger
var l net.Listener
var err error
@ -86,9 +85,7 @@ func (s *httpServer) Start() error {
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("HTTP API Listening on %s", l.Addr().String())
}
logger.Logf(log.InfoLevel, "HTTP API Listening on %s", l.Addr().String())
s.mtx.Lock()
s.address = l.Addr().String()
@ -97,7 +94,8 @@ func (s *httpServer) Start() error {
go func() {
if err := http.Serve(l, s.mux); err != nil {
// temporary fix
//logger.Fatal(err)
//logger.Log(log.FatalLevel, err)
logger.Log(log.ErrorLevel, err)
}
}()

View File

@ -2,9 +2,11 @@ package server
import (
"crypto/tls"
"go-micro.dev/v4/api/server/cors"
"net/http"
"go-micro.dev/v4/api/server/cors"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/api/resolver"
"go-micro.dev/v4/api/server/acme"
)
@ -21,10 +23,23 @@ type Options struct {
TLSConfig *tls.Config
Resolver resolver.Resolver
Wrappers []Wrapper
Logger logger.Logger
}
type Wrapper func(h http.Handler) http.Handler
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
return options
}
func WrapHandler(w Wrapper) Option {
return func(o *Options) {
o.Wrappers = append(o.Wrappers, w)
@ -78,3 +93,10 @@ func Resolver(r resolver.Resolver) Option {
o.Resolver = r
}
}
// Logger sets the underline logging framework
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -3,13 +3,19 @@ package auth
import (
"context"
"time"
"go-micro.dev/v4/logger"
)
func NewOptions(opts ...Option) Options {
var options Options
options := Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
return options
}
@ -28,6 +34,8 @@ type Options struct {
PrivateKey string
// Addrs sets the addresses of auth
Addrs []string
// Logger is the underline logger
Logger logger.Logger
}
type Option func(o *Options)
@ -60,6 +68,13 @@ func PrivateKey(key string) Option {
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// Credentials sets the auth credentials
func Credentials(id, secret string) Option {
return func(o *Options) {

View File

@ -3,7 +3,6 @@ package broker
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
@ -17,6 +16,8 @@ import (
"time"
"github.com/google/uuid"
"golang.org/x/net/http2"
"go-micro.dev/v4/codec/json"
merr "go-micro.dev/v4/errors"
"go-micro.dev/v4/registry"
@ -24,7 +25,6 @@ import (
maddr "go-micro.dev/v4/util/addr"
mnet "go-micro.dev/v4/util/net"
mls "go-micro.dev/v4/util/tls"
"golang.org/x/net/http2"
)
// HTTP Broker is a point to point async broker
@ -107,11 +107,10 @@ func newTransport(config *tls.Config) *http.Transport {
}
func newHttpBroker(opts ...Option) Broker {
options := Options{
Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
}
options := *NewOptions(opts...)
options.Registry = registry.DefaultRegistry
options.Codec = json.Marshaler{}
for _, o := range opts {
o(&options)

View File

@ -2,20 +2,20 @@
package broker
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
maddr "go-micro.dev/v4/util/addr"
mnet "go-micro.dev/v4/util/net"
)
type memoryBroker struct {
opts Options
opts *Options
addr string
sync.RWMutex
@ -24,7 +24,7 @@ type memoryBroker struct {
}
type memoryEvent struct {
opts Options
opts *Options
topic string
err error
message interface{}
@ -39,7 +39,7 @@ type memorySubscriber struct {
}
func (m *memoryBroker) Options() Options {
return m.opts
return *m.opts
}
func (m *memoryBroker) Address() string {
@ -84,7 +84,7 @@ func (m *memoryBroker) Disconnect() error {
func (m *memoryBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
o(m.opts)
}
return nil
}
@ -190,9 +190,7 @@ func (m *memoryEvent) Message() *Message {
case []byte:
msg := &Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[memory]: failed to unmarshal: %v\n", err)
}
m.opts.Logger.Logf(log.ErrorLevel, "[memory]: failed to unmarshal: %v\n", err)
return nil
}
return msg
@ -223,14 +221,9 @@ func (m *memorySubscriber) Unsubscribe() error {
}
func NewMemoryBroker(opts ...Option) Broker {
options := Options{
Context: context.Background(),
}
options := NewOptions(opts...)
rand.Seed(time.Now().UnixNano())
for _, o := range opts {
o(&options)
}
return &memoryBroker{
opts: options,

View File

@ -5,6 +5,7 @@ import (
"crypto/tls"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
)
@ -13,6 +14,9 @@ type Options struct {
Secure bool
Codec codec.Marshaler
// Logger is the underlying logger
Logger logger.Logger
// Handler executed when error happens in broker mesage
// processing
ErrorHandler Handler
@ -58,6 +62,19 @@ func PublishContext(ctx context.Context) PublishOption {
type SubscribeOption func(*SubscribeOptions)
func NewOptions(opts ...Option) *Options {
options := Options{
Context: context.Background(),
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
return &options
}
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{
AutoAck: true,
@ -128,6 +145,13 @@ func TLSConfig(t *tls.Config) Option {
}
}
// Logger sets the underline logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {

1
cache/cache.go vendored
View File

@ -22,7 +22,6 @@ var (
)
// Cache is the interface that wraps the cache.
//
type Cache interface {
// Get gets a cached value by key.
Get(ctx context.Context, key string) (interface{}, time.Time, error)

12
cache/options.go vendored
View File

@ -3,6 +3,8 @@ package cache
import (
"context"
"time"
"go-micro.dev/v4/logger"
)
// Options represents the options for the cache.
@ -13,6 +15,8 @@ type Options struct {
Address string
// Context should contain all implementation specific options, using context.WithValue.
Context context.Context
// Logger is the be used logger
Logger logger.Logger
}
// Option manipulates the Options passed.
@ -46,11 +50,19 @@ func WithContext(c context.Context) Option {
}
}
// WithLogger sets underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// NewOptions returns a new options struct.
func NewOptions(opts ...Option) Options {
options := Options{
Expiration: DefaultExpiration,
Items: make(map[string]Item),
Logger: logger.DefaultLogger,
}
for _, o := range opts {

View File

@ -6,6 +6,7 @@ import (
"go-micro.dev/v4/broker"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/selector"
"go-micro.dev/v4/transport"
@ -38,6 +39,9 @@ type Options struct {
// Default Call Options
CallOptions CallOptions
// Logger is the underline logger
Logger logger.Logger
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
@ -113,6 +117,7 @@ func NewOptions(options ...Option) Options {
Selector: selector.DefaultSelector,
Registry: registry.DefaultRegistry,
Transport: transport.DefaultTransport,
Logger: logger.DefaultLogger,
}
for _, o := range options {
@ -364,3 +369,10 @@ func WithRouter(r Router) Option {
o.Router = r
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -89,6 +89,7 @@ func (c *cliSource) String() string {
// command line flags have already been parsed.
//
// Example:
//
// cli.StringFlag{Name: "db-host"},
//
//

View File

@ -117,6 +117,7 @@ func (e *env) String() string {
// Underscores are delimiters for nesting, and all keys are lowercased.
//
// Example:
//
// "DATABASE_SERVER_HOST=localhost" will convert to
//
// {

View File

@ -89,6 +89,7 @@ func (fs *flagsrc) String() string {
// Hyphens are delimiters for nesting, and all keys are lowercased.
//
// Example:
//
// dbhost := flag.String("database-host", "localhost", "the db host name")
//
// {

View File

@ -8,18 +8,17 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
// NewStream returns an initialized memory stream
func NewStream(opts ...Option) (Stream, error) {
// parse the options
var options Options
for _, o := range opts {
o(&options)
}
return &mem{store: store.NewMemoryStore()}, nil
options := NewOptions(opts...)
return &mem{store: store.NewMemoryStore(), options: options}, nil
}
type subscriber struct {
@ -35,6 +34,7 @@ type subscriber struct {
}
type mem struct {
options *Options
store store.Store
subs []*subscriber
@ -147,10 +147,8 @@ func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
// lookup all events which match the topic (a blank topic will return all results)
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error looking up previous events: %v", err)
return
} else if err != nil {
if err != nil {
m.options.Logger.Logf(log.ErrorLevel, "Error looking up previous events: %v", err)
return
}
@ -172,7 +170,7 @@ func (m *mem) handleEvent(ev *Event) {
m.RLock()
subs := m.subs
m.RUnlock()
logger := m.options.Logger
// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message
// being sent to two subscribers with the same queue.
filteredSubs := map[string]*subscriber{}
@ -186,16 +184,19 @@ func (m *mem) handleEvent(ev *Event) {
// send the message to each channel async (since one channel might be blocked)
for _, sub := range filteredSubs {
sendEvent(ev, sub)
go func(s *subscriber) {
if err := sendEvent(ev, s); err != nil {
logger.Log(log.ErrorLevel, err)
}
}(sub)
}
}
func sendEvent(ev *Event, sub *subscriber) {
go func(s *subscriber) {
func sendEvent(ev *Event, s *subscriber) error {
evCopy := *ev
if s.autoAck {
s.Channel <- evCopy
return
return nil
}
evCopy.SetAckFunc(ackFunc(s, evCopy))
evCopy.SetNackFunc(nackFunc(s, evCopy))
@ -212,20 +213,17 @@ func sendEvent(ev *Event, sub *subscriber) {
}
if s.retryLimit > -1 && count > s.retryLimit {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit)
}
s.Lock()
delete(s.retryMap, evCopy.ID)
s.Unlock()
return
return fmt.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit)
}
s.Channel <- evCopy
s.Lock()
s.retryMap[evCopy.ID] = count + 1
s.Unlock()
}
}(sub)
return nil
}
func ackFunc(s *subscriber, evCopy Event) func() error {

View File

@ -1,18 +1,44 @@
package events
import "time"
import (
"time"
type Options struct{}
"go-micro.dev/v4/logger"
)
type Options struct {
Logger logger.Logger
}
type Option func(o *Options)
func NewOptions(opts ...Option) *Options {
options := Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
return &options
}
type StoreOptions struct {
TTL time.Duration
Backup Backup
Logger logger.Logger
}
type StoreOption func(o *StoreOptions)
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) StoreOption {
return func(o *StoreOptions) {
o.Logger = l
}
}
// PublishOptions contains all the options which can be provided when publishing an event
type PublishOptions struct {
// Metadata contains any keys which can be used to query the data, for example a customer id

View File

@ -5,7 +5,8 @@ import (
"time"
"github.com/pkg/errors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
@ -22,6 +23,8 @@ func NewStore(opts ...StoreOption) Store {
options.TTL = time.Hour * 24
}
options.Logger = log.LoggerOrDefault(options.Logger)
// return the store
evs := &evStore{
opts: options,
@ -114,7 +117,7 @@ func (s *evStore) backupLoop() {
for {
err := s.opts.Backup.Snapshot(s.store)
if err != nil {
logger.Errorf("Error running backup %s", err)
s.opts.Logger.Logf(log.ErrorLevel, "Error running backup %s", err)
}
time.Sleep(1 * time.Hour)

View File

@ -135,7 +135,7 @@ func (l *defaultLogger) Log(level Level, v ...interface{}) {
func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) {
// TODO decide does we need to write message if log level not used?
if level < l.opts.Level {
if !l.opts.Level.Enabled(level) {
return
}

View File

@ -129,3 +129,10 @@ func (h *Helper) WithError(err error) *Helper {
func (h *Helper) WithFields(fields map[string]interface{}) *Helper {
return &Helper{logger: h.logger.Fields(fields)}
}
func HelperOrDefault(h *Helper) *Helper {
if h == nil {
return DefaultHelper
}
return h
}

View File

@ -4,6 +4,9 @@ package logger
var (
// Default logger
DefaultLogger Logger = NewLogger()
// Default logger helper
DefaultHelper *Helper = NewHelper(DefaultLogger)
)
// Logger is a generic logging interface
@ -41,3 +44,10 @@ func Logf(level Level, format string, v ...interface{}) {
func String() string {
return DefaultLogger.String()
}
func LoggerOrDefault(l Logger) Logger {
if l == nil {
return DefaultLogger
}
return l
}

View File

@ -12,6 +12,7 @@ import (
"go-micro.dev/v4/config"
"go-micro.dev/v4/debug/profile"
"go-micro.dev/v4/debug/trace"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/runtime"
"go-micro.dev/v4/selector"
@ -35,7 +36,7 @@ type Options struct {
Runtime runtime.Runtime
Transport transport.Transport
Profile profile.Profile
Logger logger.Logger
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
@ -64,6 +65,7 @@ func newOptions(opts ...Option) Options {
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
Logger: logger.DefaultLogger,
}
for _, o := range opts {
@ -340,3 +342,10 @@ func AfterStop(fn func() error) Option {
o.AfterStop = append(o.AfterStop, fn)
}
}
// Logger sets the logger for the service
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -7,10 +7,11 @@ import (
"sync"
"time"
"go-micro.dev/v4/logger"
"golang.org/x/sync/singleflight"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
util "go-micro.dev/v4/util/registry"
"golang.org/x/sync/singleflight"
)
// Cache is the registry cache interface
@ -24,6 +25,8 @@ type Cache interface {
type Options struct {
// TTL is the cache TTL
TTL time.Duration
Logger log.Logger
}
type Option func(o *Options)
@ -320,7 +323,7 @@ func (c *cache) run(service string) {
c.Lock()
c.watchedRunning[service] = true
c.Unlock()
logger := c.opts.Logger
// reset watcher on exit
defer func() {
c.Lock()
@ -352,9 +355,7 @@ func (c *cache) run(service string) {
c.setStatus(err)
if a > 3 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("rcache: ", err, " backing off ", d)
}
logger.Logf(log.DebugLevel, "rcache: ", err, " backing off ", d)
a = 0
}
@ -377,9 +378,7 @@ func (c *cache) run(service string) {
c.setStatus(err)
if b > 3 {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debug("rcache: ", err, " backing off ", d)
}
logger.Logf(log.DebugLevel, "rcache: ", err, " backing off ", d)
b = 0
}
@ -468,6 +467,7 @@ func New(r registry.Registry, opts ...Option) Cache {
rand.Seed(time.Now().UnixNano())
options := Options{
TTL: DefaultTTL,
Logger: log.DefaultLogger,
}
for _, o := range opts {

View File

@ -2,6 +2,8 @@ package cache
import (
"time"
"go-micro.dev/v4/logger"
)
// WithTTL sets the cache TTL
@ -10,3 +12,10 @@ func WithTTL(t time.Duration) Option {
o.TTL = t
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -16,7 +16,8 @@ import (
"time"
"github.com/google/uuid"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/util/mdns"
)
@ -38,7 +39,7 @@ type mdnsEntry struct {
}
type mdnsRegistry struct {
opts Options
opts *Options
// the mdns domain
domain string
@ -128,14 +129,8 @@ func decode(record []string) (*mdnsTxt, error) {
return txt, nil
}
func newRegistry(opts ...Option) Registry {
options := Options{
Context: context.Background(),
Timeout: time.Millisecond * 100,
}
for _, o := range opts {
o(&options)
}
mergedOpts := append([]Option{Timeout(time.Millisecond * 100)}, opts...)
options := NewOptions(mergedOpts...)
// set the domain
domain := mdnsDomain
@ -155,19 +150,20 @@ func newRegistry(opts ...Option) Registry {
func (m *mdnsRegistry) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
o(m.opts)
}
return nil
}
func (m *mdnsRegistry) Options() Options {
return m.opts
return *m.opts
}
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
m.Lock()
defer m.Unlock()
logger := m.opts.Logger
entries, ok := m.services[service.Name]
// first entry, create wildcard used for list queries
if !ok {
@ -234,9 +230,8 @@ func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error
}
port, _ := strconv.Atoi(pt)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
}
logger.Logf(log.DebugLevel, "[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
// we got here, new node
s, err := mdns.NewMDNSService(
node.Id,
@ -305,6 +300,7 @@ func (m *mdnsRegistry) Deregister(service *Service, opts ...DeregisterOption) er
}
func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service, error) {
logger := m.opts.Logger
serviceMap := make(map[string]*Service)
entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool)
@ -359,9 +355,7 @@ func (m *mdnsRegistry) GetService(service string, opts ...GetOption) ([]*Service
} else if len(e.AddrV6) > 0 {
addr = net.JoinHostPort(e.AddrV6.String(), fmt.Sprint(e.Port))
} else {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("[mdns]: invalid endpoint received: %v", e)
}
logger.Logf(log.InfoLevel, "[mdns]: invalid endpoint received: %v", e)
continue
}
s.Nodes = append(s.Nodes, &Node{

View File

@ -1,12 +1,12 @@
package registry
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
)
var (
@ -29,7 +29,7 @@ type record struct {
}
type memRegistry struct {
options Options
options *Options
sync.RWMutex
records map[string]map[string]*record
@ -37,13 +37,7 @@ type memRegistry struct {
}
func NewMemoryRegistry(opts ...Option) Registry {
options := Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
options := NewOptions(opts...)
records := getServiceRecords(options.Context)
if records == nil {
@ -62,6 +56,7 @@ func NewMemoryRegistry(opts ...Option) Registry {
}
func (m *memRegistry) ttlPrune() {
logger := m.options.Logger
prune := time.NewTicker(ttlPruneTime)
defer prune.Stop()
@ -73,9 +68,7 @@ func (m *memRegistry) ttlPrune() {
for version, record := range records {
for id, n := range record.Nodes {
if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry TTL expired for node %s of service %s", n.Id, name)
}
logger.Logf(log.DebugLevel, "Registry TTL expired for node %s of service %s", n.Id, name)
delete(m.records[name][version].Nodes, id)
}
}
@ -111,7 +104,7 @@ func (m *memRegistry) sendEvent(r *Result) {
func (m *memRegistry) Init(opts ...Option) error {
for _, o := range opts {
o(&m.options)
o(m.options)
}
// add services
@ -138,13 +131,13 @@ func (m *memRegistry) Init(opts ...Option) error {
}
func (m *memRegistry) Options() Options {
return m.options
return *m.options
}
func (m *memRegistry) Register(s *Service, opts ...RegisterOption) error {
m.Lock()
defer m.Unlock()
logger := m.options.Logger
var options RegisterOptions
for _, o := range opts {
o(&options)
@ -158,9 +151,7 @@ func (m *memRegistry) Register(s *Service, opts ...RegisterOption) error {
if _, ok := m.records[s.Name][s.Version]; !ok {
m.records[s.Name][s.Version] = r
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version)
}
logger.Logf(log.DebugLevel, "Registry added new service: %s, version: %s", s.Name, s.Version)
go m.sendEvent(&Result{Action: "update", Service: s})
return nil
}
@ -186,18 +177,14 @@ func (m *memRegistry) Register(s *Service, opts ...RegisterOption) error {
}
if addedNodes {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry added new node to service: %s, version: %s", s.Name, s.Version)
}
logger.Logf(log.DebugLevel, "Registry added new node to service: %s, version: %s", s.Name, s.Version)
go m.sendEvent(&Result{Action: "update", Service: s})
return nil
}
// refresh TTL and timestamp
for _, n := range s.Nodes {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version)
}
logger.Logf(log.DebugLevel, "Updated registration for service: %s, version: %s", s.Name, s.Version)
m.records[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL
m.records[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now()
}
@ -208,29 +195,23 @@ func (m *memRegistry) Register(s *Service, opts ...RegisterOption) error {
func (m *memRegistry) Deregister(s *Service, opts ...DeregisterOption) error {
m.Lock()
defer m.Unlock()
logger := m.options.Logger
if _, ok := m.records[s.Name]; ok {
if _, ok := m.records[s.Name][s.Version]; ok {
for _, n := range s.Nodes {
if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version)
}
logger.Logf(log.DebugLevel, "Registry removed node from service: %s, version: %s", s.Name, s.Version)
delete(m.records[s.Name][s.Version].Nodes, n.Id)
}
}
if len(m.records[s.Name][s.Version].Nodes) == 0 {
delete(m.records[s.Name], s.Version)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version)
}
logger.Logf(log.DebugLevel, "Registry removed service: %s, version: %s", s.Name, s.Version)
}
}
if len(m.records[s.Name]) == 0 {
delete(m.records, s.Name)
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Registry removed service: %s", s.Name)
}
logger.Logf(log.DebugLevel, "Registry removed service: %s", s.Name)
}
go m.sendEvent(&Result{Action: "delete", Service: s})
}

View File

@ -4,6 +4,8 @@ import (
"context"
"crypto/tls"
"time"
"go-micro.dev/v4/logger"
)
type Options struct {
@ -11,6 +13,7 @@ type Options struct {
Timeout time.Duration
Secure bool
TLSConfig *tls.Config
Logger logger.Logger
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
@ -44,6 +47,19 @@ type ListOptions struct {
Context context.Context
}
func NewOptions(opts ...Option) *Options {
options := Options{
Context: context.Background(),
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(&options)
}
return &options
}
// Addrs is the registry addresses to use
func Addrs(addrs ...string) Option {
return func(o *Options) {
@ -146,3 +162,10 @@ func Services(s map[string][]*Service) Option {
o.Context = context.WithValue(o.Context, servicesKey{}, s)
}
}
// Logger sets the underline logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
@ -14,7 +13,8 @@ import (
"time"
"github.com/nxadm/tail"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/runtime/local/git"
)
@ -24,7 +24,7 @@ const defaultNamespace = "default"
type runtime struct {
sync.RWMutex
// options configure runtime
options Options
options *Options
// used to stop the runtime
closed chan bool
// used to start new services
@ -39,12 +39,7 @@ type runtime struct {
// NewRuntime creates new local runtime and returns it
func NewRuntime(opts ...Option) Runtime {
// get default options
options := Options{}
// apply requested options
for _, o := range opts {
o(&options)
}
options := NewOptions(opts...)
// make the logs directory
path := filepath.Join(os.TempDir(), "micro", "logs")
@ -181,7 +176,7 @@ func (r *runtime) Init(opts ...Option) error {
defer r.Unlock()
for _, o := range opts {
o(&r.options)
o(r.options)
}
return nil
@ -192,6 +187,7 @@ func (r *runtime) run(events <-chan Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
logger := r.options.Logger
// process event processes an incoming event
processEvent := func(event Event, service *service, ns string) error {
// get current vals
@ -205,9 +201,7 @@ func (r *runtime) run(events <-chan Event) {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime updating service %s in %v namespace", name, ns)
}
logger.Logf(log.DebugLevel, "Runtime updating service %s in %v namespace", name, ns)
// this will cause a delete followed by created
if err := r.Update(service.Service, UpdateNamespace(ns)); err != nil {
@ -234,13 +228,10 @@ func (r *runtime) run(events <-chan Event) {
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting %s", service.Name)
}
logger.Logf(log.DebugLevel, "Runtime starting %s", service.Name)
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting %s: %v", service.Name, err)
}
logger.Logf(log.DebugLevel, "Runtime error starting %s: %v", service.Name, err)
}
}
}
@ -250,18 +241,13 @@ func (r *runtime) run(events <-chan Event) {
continue
}
// TODO: check service error
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime starting service %s", service.Name)
}
logger.Logf(log.DebugLevel, "Runtime starting service %s", service.Name)
if err := service.Start(); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error starting service %s: %v", service.Name, err)
}
logger.Logf(log.DebugLevel, "Runtime error starting service %s: %v", service.Name, err)
}
case event := <-events:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime received notification event: %v", event)
}
logger.Logf(log.DebugLevel, "Runtime received notification event: %v", event)
// NOTE: we only handle Update events for now
switch event.Type {
case Update:
@ -273,22 +259,18 @@ func (r *runtime) run(events <-chan Event) {
r.RLock()
if _, ok := r.namespaces[ns]; !ok {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime unknown namespace: %s", ns)
}
logger.Logf(log.DebugLevel, "Runtime unknown namespace: %s", ns)
r.RUnlock()
continue
}
service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)]
r.RUnlock()
if !ok {
logger.Debugf("Runtime unknown service: %s", event.Service)
logger.Logf(log.DebugLevel, "Runtime unknown service: %s", event.Service)
}
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
logger.Logf(log.DebugLevel, "Runtime error updating service %s: %v", event.Service, err)
}
continue
}
@ -301,17 +283,13 @@ func (r *runtime) run(events <-chan Event) {
for ns, services := range namespaces {
for _, service := range services {
if err := processEvent(event, service, ns); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
logger.Logf(log.DebugLevel, "Runtime error updating service %s: %v", service.Name, err)
}
}
}
}
case <-r.closed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopped")
}
logger.Logf(log.DebugLevel, "Runtime stopped")
return
}
}
@ -357,11 +335,11 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
}
// create new service
service := newService(s, options)
service := newService(s, options, r.options.Logger)
f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
r.options.Logger.Log(log.FatalLevel, err)
}
if service.output != nil {
@ -400,10 +378,12 @@ func (r *runtime) Logs(s *Service, options ...LogsOption) (LogStream, error) {
for _, o := range options {
o(&lopts)
}
ret := &logStream{
service: s.Name,
stream: make(chan LogRecord),
stop: make(chan bool),
logger: r.options.Logger,
}
fpath := logFile(s.Name)
@ -463,6 +443,7 @@ type logStream struct {
sync.Mutex
stop chan bool
err error
logger log.Logger
}
func (l *logStream) Chan() chan LogRecord {
@ -485,7 +466,7 @@ func (l *logStream) Stop() error {
close(l.stream)
err := l.tail.Stop()
if err != nil {
logger.Errorf("Error stopping tail: %v", err)
l.logger.Logf(log.ErrorLevel, "Error stopping tail: %v", err)
return err
}
}
@ -565,7 +546,7 @@ func (r *runtime) Update(s *Service, opts ...UpdateOption) error {
}
if err := service.Stop(); err != nil && err.Error() != "no such process" {
logger.Errorf("Error stopping service %s: %s", service.Name, err)
r.options.Logger.Logf(log.ErrorLevel, "Error stopping service %s: %s", service.Name, err)
return err
}
@ -590,9 +571,7 @@ func (r *runtime) Delete(s *Service, opts ...DeleteOption) error {
return nil
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime deleting service %s", s.Name)
}
r.options.Logger.Logf(log.DebugLevel, "Runtime deleting service %s", s.Name)
service, ok := srvs[serviceKey(s)]
if !ok {
@ -635,9 +614,7 @@ func (r *runtime) Start() error {
events, err = r.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to start update notifier")
}
r.options.Logger.Logf(log.DebugLevel, "Runtime failed to start update notifier")
}
}
@ -667,9 +644,7 @@ func (r *runtime) Stop() error {
// stop all the services
for _, services := range r.namespaces {
for _, service := range services {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime stopping %s", service.Name)
}
r.options.Logger.Logf(log.DebugLevel, "Runtime stopping %s", service.Name)
service.Stop()
}
}

View File

@ -17,7 +17,7 @@ type action int
type kubernetes struct {
sync.RWMutex
// options configure runtime
options runtime.Options
options *runtime.Options
// indicates if we're running
running bool
// used to stop the runtime
@ -235,7 +235,7 @@ func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOpti
func (k *kubernetes) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
logger := k.options.Logger
for {
select {
case <-t.C:
@ -243,9 +243,7 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
// - do we even need the ticker for k8s services?
case event := <-events:
// NOTE: we only handle Update events for now
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime received notification event: %v", event)
}
logger.Logf(log.DebugLevel, "Runtime received notification event: %v", event)
switch event.Type {
case runtime.Update:
// only process if there's an actual service
@ -277,9 +275,7 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
}, client.GetLabels(labels))
if err != nil {
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime update failed to get service %s: %v", event.Service, err)
}
logger.Logf(log.DebugLevel, "Runtime update failed to get service %s: %v", event.Service, err)
continue
}
@ -298,21 +294,15 @@ func (k *kubernetes) run(events <-chan runtime.Event) {
// update the build time
service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix())
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name)
}
logger.Logf(log.DebugLevel, "Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name)
if err := k.client.Update(deploymentResource(&service)); err != nil {
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime failed to update service %s: %v", event.Service, err)
}
logger.Logf(log.DebugLevel, "Runtime failed to update service %s: %v", event.Service, err)
continue
}
}
}
case <-k.closed:
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime stopped")
}
logger.Logf(log.DebugLevel, "Runtime stopped")
return
}
}
@ -324,7 +314,7 @@ func (k *kubernetes) Init(opts ...runtime.Option) error {
defer k.Unlock()
for _, o := range opts {
o(&k.options)
o(k.options)
}
return nil
@ -341,7 +331,7 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru
go func() {
records, err := klo.Read()
if err != nil {
log.Errorf("Failed to get logs for service '%v' from k8s: %v", err)
k.options.Logger.Logf(log.ErrorLevel, "Failed to get logs for service '%v' from k8s: %v", err)
return
}
// @todo: this might actually not run before podLogStream starts
@ -425,7 +415,7 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er
options.Image = k.getImage(s, options)
// create new service
service := newService(s, options)
service := newService(s, options, k.options.Logger)
// start the service
return service.Start(k.client, client.CreateNamespace(options.Namespace))
@ -542,7 +532,7 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er
service := newService(s, runtime.CreateOptions{
Type: k.options.Type,
Namespace: options.Namespace,
})
}, k.options.Logger)
return service.Stop(k.client, client.DeleteNamespace(options.Namespace))
}
@ -567,9 +557,7 @@ func (k *kubernetes) Start() error {
events, err = k.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
if log.V(log.DebugLevel, log.DefaultLogger) {
log.Debugf("Runtime failed to start update notifier")
}
k.options.Logger.Logf(log.DebugLevel, "Runtime failed to start update notifier")
}
}
@ -611,15 +599,9 @@ func (k *kubernetes) String() string {
// NewRuntime creates new kubernetes runtime
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{
// Create labels with type "micro": "service"
Type: "service",
}
// apply requested options
for _, o := range opts {
o(&options)
}
mtops := append([]runtime.Option{runtime.WithType("service")}, opts...)
options := runtime.NewOptions(mtops...)
// kubernetes client
client := client.NewClusterClient()

View File

@ -5,7 +5,7 @@ import (
"strings"
"time"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/runtime"
"go-micro.dev/v4/util/kubernetes/api"
"go-micro.dev/v4/util/kubernetes/client"
@ -18,6 +18,8 @@ type service struct {
kservice *client.Service
// Kubernetes deployment
kdeploy *client.Deployment
// to be used logger
logger log.Logger
}
func parseError(err error) *api.Status {
@ -26,7 +28,7 @@ func parseError(err error) *api.Status {
return status
}
func newService(s *runtime.Service, c runtime.CreateOptions) *service {
func newService(s *runtime.Service, c runtime.CreateOptions, l log.Logger) *service {
// use pre-formatted name/version
name := client.Format(s.Name)
version := client.Format(s.Version)
@ -93,6 +95,7 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service {
Service: s,
kservice: kservice,
kdeploy: kdeploy,
logger: log.LoggerOrDefault(l),
}
}
@ -116,9 +119,7 @@ func serviceResource(s *client.Service) *client.Resource {
func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
// create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create deployment: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to create deployment: %v", err)
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
@ -128,9 +129,7 @@ func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
}
// create service now that the deployment has been created
if err := k.Create(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to create service: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to create service: %v", err)
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
@ -147,17 +146,13 @@ func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error {
// first attempt to delete service
if err := k.Delete(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete service: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to delete service: %v", err)
s.Status("error", err)
return err
}
// delete deployment once the service has been deleted
if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to delete deployment: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to delete deployment: %v", err)
s.Status("error", err)
return err
}
@ -169,16 +164,12 @@ func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error {
func (s *service) Update(k client.Client, opts ...client.UpdateOption) error {
if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update deployment: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to update deployment: %v", err)
s.Status("error", err)
return err
}
if err := k.Update(serviceResource(s.kservice), opts...); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime failed to update service: %v", err)
}
s.logger.Logf(log.DebugLevel, "Runtime failed to update service: %v", err)
return err
}

View File

@ -84,7 +84,7 @@ func NewBuilder(opts ...build.Option) build.Builder {
endpoint := "unix:///var/run/docker.sock"
client, err := docker.NewClient(endpoint)
if err != nil {
logger.Fatal(err)
logger.Log(logger.FatalLevel, err)
}
return &Builder{
Options: options,

View File

@ -5,6 +5,7 @@ import (
"io"
"go-micro.dev/v4/client"
"go-micro.dev/v4/logger"
)
type Option func(o *Options)
@ -21,6 +22,20 @@ type Options struct {
Image string
// Client to use when making requests
Client client.Client
// Logger underline logger
Logger logger.Logger
}
func NewOptions(opts ...Option) *Options {
options := &Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(options)
}
return options
}
// WithSource sets the base image / repository
@ -58,6 +73,13 @@ func WithClient(c client.Client) Option {
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
type CreateOption func(o *CreateOptions)
type ReadOption func(o *ReadOptions)

View File

@ -8,7 +8,7 @@ import (
"sync"
"time"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/runtime/local/build"
"go-micro.dev/v4/runtime/local/process"
proc "go-micro.dev/v4/runtime/local/process/os"
@ -36,9 +36,11 @@ type service struct {
Exec *process.Executable
// process pid
PID *process.PID
// to be used logger
Logger log.Logger
}
func newService(s *Service, c CreateOptions) *service {
func newService(s *Service, c CreateOptions, l log.Logger) *service {
var exec string
var args []string
@ -58,6 +60,7 @@ func newService(s *Service, c CreateOptions) *service {
Args: args,
Dir: s.Source,
},
Logger: log.LoggerOrDefault(l),
closed: make(chan bool),
output: c.Output,
updated: time.Now(),
@ -101,7 +104,6 @@ func (s *service) Start() error {
if !s.shouldStart() {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
@ -113,9 +115,7 @@ func (s *service) Start() error {
s.Status("starting", nil)
// TODO: pull source & build binary
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Runtime service %s forking new process", s.Service.Name)
}
s.Logger.Log(log.DebugLevel, "Runtime service %s forking new process", s.Service.Name)
p, err := s.Process.Fork(s.Exec)
if err != nil {
@ -207,15 +207,13 @@ func (s *service) Wait() {
if s.PID.ID != thisPID.ID {
// trying to update when it's already been switched out, ignore
logger.Warnf("Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID)
s.Logger.Logf(log.WarnLevel, "Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID)
return
}
// save the error
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Service %s terminated with error %s", s.Name, err)
}
s.Logger.Logf(log.ErrorLevel, "Service %s terminated with error %s", s.Name, err)
s.retries++
s.Status("error", err)
s.Metadata["retries"] = strconv.Itoa(s.retries)

View File

@ -3,6 +3,7 @@ package selector
import (
"context"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
)
@ -13,6 +14,8 @@ type Options struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Logger is the underline logger
Logger logger.Logger
}
type SelectOptions struct {
@ -58,3 +61,10 @@ func WithStrategy(fn Strategy) SelectOption {
o.Strategy = fn
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -9,10 +9,36 @@ import (
"go-micro.dev/v4/broker"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/debug/trace"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/transport"
)
type RouterOptions struct {
Logger logger.Logger
}
type RouterOption func(o *RouterOptions)
func newRouterOptions(opt ...RouterOption) RouterOptions {
opts := RouterOptions{
Logger: logger.DefaultLogger,
}
for _, o := range opt {
o(&opts)
}
return opts
}
// WithRouterLogger sets the underline router logger
func WithRouterLogger(l logger.Logger) RouterOption {
return func(o *RouterOptions) {
o.Logger = l
}
}
type Options struct {
Codecs map[string]codec.NewCodec
Broker broker.Broker
@ -28,6 +54,7 @@ type Options struct {
HdlrWrappers []HandlerWrapper
SubWrappers []SubscriberWrapper
ListenOptions []transport.ListenOption
Logger logger.Logger
// RegisterCheck runs a check function before registering the service
RegisterCheck func(context.Context) error
@ -53,6 +80,7 @@ func newOptions(opt ...Option) Options {
Metadata: map[string]string{},
RegisterInterval: DefaultRegisterInterval,
RegisterTTL: DefaultRegisterTTL,
Logger: logger.DefaultLogger,
}
for _, o := range opt {
@ -228,6 +256,13 @@ func WithRouter(r Router) Option {
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// Wait tells the server to wait for requests to finish before exiting
// If `wg` is nil, server only wait for completion of rpc handler.
// For user need finer grained control, pass a concrete `wg` here, server will

View File

@ -20,11 +20,12 @@ import (
"go-micro.dev/v4/codec"
merrors "go-micro.dev/v4/errors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
)
var (
lastStreamResponseError = errors.New("EOS")
errLastStreamResponse = errors.New("EOS")
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
@ -60,6 +61,7 @@ type response struct {
// router represents an RPC router.
type router struct {
name string
ops RouterOptions
mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
@ -93,8 +95,9 @@ func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response)
return r.h(ctx, req, rsp)
}
func newRpcRouter() *router {
func newRpcRouter(opts ...RouterOption) *router {
return &router{
ops: newRouterOptions(opts...),
serviceMap: make(map[string]*service),
subscribers: make(map[string][]*subscriber),
}
@ -118,7 +121,7 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
// prepareMethod returns a methodType for the provided method or nil
// in case if the method was unsuitable.
func prepareMethod(method reflect.Method) *methodType {
func prepareMethod(method reflect.Method, logger log.Logger) *methodType {
mtype := method.Type
mname := method.Name
var replyType, argType, contextType reflect.Type
@ -141,7 +144,7 @@ func prepareMethod(method reflect.Method) *methodType {
replyType = mtype.In(3)
contextType = mtype.In(1)
default:
logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
logger.Logf(log.ErrorLevel, "method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
return nil
}
@ -149,7 +152,7 @@ func prepareMethod(method reflect.Method) *methodType {
// check stream type
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) {
logger.Errorf("%v argument does not implement Stream interface: %v", mname, argType)
logger.Logf(log.ErrorLevel, "%v argument does not implement Stream interface: %v", mname, argType)
return nil
}
} else {
@ -157,30 +160,30 @@ func prepareMethod(method reflect.Method) *methodType {
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
logger.Errorf("%v argument type not exported: %v", mname, argType)
logger.Logf(log.ErrorLevel, "%v argument type not exported: %v", mname, argType)
return nil
}
if replyType.Kind() != reflect.Ptr {
logger.Errorf("method %v reply type not a pointer: %v", mname, replyType)
logger.Logf(log.ErrorLevel, "method %v reply type not a pointer: %v", mname, replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
logger.Errorf("method %v reply type not exported: %v", mname, replyType)
logger.Logf(log.ErrorLevel, "method %v reply type not exported: %v", mname, replyType)
return nil
}
}
// Method needs one out.
if mtype.NumOut() != 1 {
logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
logger.Logf(log.ErrorLevel, "method %v has wrong number of outs: %v", mname, mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
logger.Errorf("method %v returns %v not error", mname, returnType.String())
logger.Logf(log.ErrorLevel, "method %v returns %v not error", mname, returnType.String())
return nil
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
@ -266,7 +269,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil
} else {
// no error, we send the special EOS error
return lastStreamResponseError
return errLastStreamResponse
}
}
@ -446,7 +449,7 @@ func (router *router) Handle(h Handler) error {
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
if mt := prepareMethod(method); mt != nil {
if mt := prepareMethod(method, router.ops.Logger); mt != nil {
s.method[method.Name] = mt
}
}
@ -509,8 +512,8 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro
defer func() {
// recover any panics
if r := recover(); r != nil {
logger.Errorf("panic recovered: %v", r)
logger.Error(string(debug.Stack()))
router.ops.Logger.Logf(log.ErrorLevel, "panic recovered: %v", r)
router.ops.Logger.Log(log.ErrorLevel, string(debug.Stack()))
err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r)
}
}()

View File

@ -15,7 +15,8 @@ import (
"go-micro.dev/v4/broker"
"go-micro.dev/v4/codec"
raw "go-micro.dev/v4/codec/bytes"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/metadata"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/transport"
@ -131,6 +132,7 @@ func (s *rpcServer) HandleEvent(e broker.Event) error {
// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
logger := s.opts.Logger
// global error tracking
var gerr error
// streams are multiplexed on Micro-Stream or Micro-Id header
@ -161,10 +163,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics
if r := recover(); r != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
logger.Log(log.ErrorLevel, "panic recovered: ", r)
logger.Log(log.ErrorLevel, string(debug.Stack()))
}
}()
@ -227,7 +227,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
if !ok && stream {
// check if its a last stream EOS error
err := msg.Header["Micro-Error"]
if err == lastStreamResponseError.Error() {
if err == errLastStreamResponse.Error() {
pool.Release(psock)
continue
}
@ -383,10 +383,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics for outbound process
if r := recover(); r != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
logger.Log(log.ErrorLevel, "panic recovered: ", r)
logger.Log(log.ErrorLevel, string(debug.Stack()))
}
}()
@ -414,8 +412,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics for call handler
if r := recover(); r != nil {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
logger.Log(log.ErrorLevel, "panic recovered: ", r)
logger.Log(log.ErrorLevel, string(debug.Stack()))
}
}()
@ -430,11 +428,11 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// if the server request is an EOS error we let the socket know
// sometimes the socket is already closed on the other side, so we can ignore that error
alreadyClosed := serveRequestError == lastStreamResponseError && writeError == io.EOF
alreadyClosed := serveRequestError == errLastStreamResponse && writeError == io.EOF
// could not write error response
if writeError != nil && !alreadyClosed {
logger.Debugf("rpc: unable to write error response: %v", writeError)
logger.Logf(log.DebugLevel, "rpc: unable to write error response: %v", writeError)
}
}
}(id, psock)
@ -517,7 +515,7 @@ func (s *rpcServer) Register() error {
rsvc := s.rsvc
config := s.Options()
s.RUnlock()
logger := s.opts.Logger
regFunc := func(service *registry.Service) error {
// create registry options
rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)}
@ -650,9 +648,7 @@ func (s *rpcServer) Register() error {
s.RUnlock()
if !registered {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
logger.Logf(log.InfoLevel, "Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
// register the service
@ -702,9 +698,7 @@ func (s *rpcServer) Register() error {
if err != nil {
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sub.Topic())
}
logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sub.Topic())
s.subscribers[sb] = []broker.Subscriber{sub}
}
if cacheService {
@ -718,7 +712,7 @@ func (s *rpcServer) Register() error {
func (s *rpcServer) Deregister() error {
var err error
var advt, host, port string
logger := s.opts.Logger
s.RLock()
config := s.Options()
s.RUnlock()
@ -763,9 +757,7 @@ func (s *rpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
}
logger.Logf(log.InfoLevel, "Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
if err := config.Registry.Deregister(service); err != nil {
return err
}
@ -788,9 +780,7 @@ func (s *rpcServer) Deregister() error {
for sb, subs := range s.subscribers {
for _, sub := range subs {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Unsubscribing %s from topic: %s", node.Id, sub.Topic())
}
logger.Logf(log.InfoLevel, "Unsubscribing %s from topic: %s", node.Id, sub.Topic())
sub.Unsubscribe()
}
s.subscribers[sb] = nil
@ -807,7 +797,7 @@ func (s *rpcServer) Start() error {
return nil
}
s.RUnlock()
logger := s.opts.Logger
config := s.Options()
// start listening on the transport
@ -816,9 +806,7 @@ func (s *rpcServer) Start() error {
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
}
logger.Logf(log.InfoLevel, "Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
// swap address
s.Lock()
@ -830,27 +818,19 @@ func (s *rpcServer) Start() error {
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] connect error: %v", bname, err)
}
logger.Logf(log.ErrorLevel, "Broker [%s] connect error: %v", bname, err)
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Connected to %s", bname, config.Broker.Address())
}
logger.Logf(log.InfoLevel, "Broker [%s] Connected to %s", bname, config.Broker.Address())
// use RegisterCheck func before register
if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register check error: %s", config.Name, config.Id, err)
} else {
// announce self to the world
if err = s.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
@ -871,9 +851,7 @@ func (s *rpcServer) Start() error {
// check the error and backoff
default:
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Accept error: %v", err)
}
logger.Logf(log.ErrorLevel, "Accept error: %v", err)
time.Sleep(time.Second)
continue
}
@ -906,25 +884,17 @@ func (s *rpcServer) Start() error {
s.RUnlock()
rerr := s.opts.RegisterCheck(s.opts.Context)
if rerr != nil && registered {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
// deregister self in case of error
if err := s.Deregister(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
} else if rerr != nil && !registered {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register check error: %s", config.Name, config.Id, err)
continue
}
if err := s.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register error: %s", config.Name, config.Id, err)
}
// wait for exit
case ch = <-s.exit:
@ -940,9 +910,7 @@ func (s *rpcServer) Start() error {
if registered {
// deregister self
if err := s.Deregister(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
@ -958,14 +926,10 @@ func (s *rpcServer) Start() error {
// close transport listener
ch <- ts.Close()
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
}
logger.Logf(log.InfoLevel, "Broker [%s] Disconnected from %s", bname, config.Broker.Address())
// disconnect the broker
if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Broker [%s] Disconnect error: %v", bname, err)
}
logger.Logf(log.ErrorLevel, "Broker [%s] Disconnect error: %v", bname, err)
}
// swap back address

View File

@ -65,7 +65,7 @@ func (r *rpcStream) Recv(msg interface{}) error {
if len(req.Error) > 0 {
// Check the client closed the stream
switch req.Error {
case lastStreamResponseError.Error():
case errLastStreamResponse.Error():
// discard body
r.Unlock()
r.codec.ReadBody(nil)

View File

@ -8,8 +8,9 @@ import (
"time"
"github.com/google/uuid"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
signalutil "go-micro.dev/v4/util/signal"
)
@ -116,7 +117,6 @@ type Stream interface {
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
//
type Handler interface {
Name() string
Handler() interface{}
@ -184,7 +184,6 @@ func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscr
// func (f *Foo) Bar(ctx, req, rsp) error {
// return nil
// }
//
func NewHandler(h interface{}, opts ...HandlerOption) Handler {
return DefaultServer.NewHandler(h, opts...)
}
@ -210,27 +209,21 @@ func Run() error {
ch := make(chan os.Signal, 1)
signal.Notify(ch, signalutil.Shutdown()...)
DefaultServer.Options().Logger.Logf(log.InfoLevel, "Received signal %s", <-ch)
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Received signal %s", <-ch)
}
return Stop()
}
// Start starts the default server
func Start() error {
config := DefaultServer.Options()
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Starting server %s id %s", config.Name, config.Id)
}
config.Logger.Logf(log.InfoLevel, "Starting server %s id %s", config.Name, config.Id)
return DefaultServer.Start()
}
// Stop stops the default server
func Stop() error {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Stopping server")
}
DefaultServer.Options().Logger.Logf(log.InfoLevel, "Stopping server")
return DefaultServer.Stop()
}

View File

@ -7,7 +7,7 @@ import (
"sync"
"go-micro.dev/v4/client"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/server"
"go-micro.dev/v4/store"
"go-micro.dev/v4/util/cmd"
@ -58,14 +58,14 @@ func (s *service) Init(opts ...Option) {
cmd.Store(&s.opts.Store),
cmd.Profile(&s.opts.Profile),
); err != nil {
logger.Fatal(err)
s.opts.Logger.Log(log.FatalLevel, err)
}
// Explicitly set the table name to the service name
name := s.opts.Cmd.App().Name
err := s.opts.Store.Init(store.Table(name))
if err != nil {
logger.Fatal(err)
s.opts.Logger.Log(log.FatalLevel, err)
}
})
}
@ -125,6 +125,8 @@ func (s *service) Stop() error {
}
func (s *service) Run() (err error) {
logger := s.opts.Logger
// exit when help flag is provided
for _, v := range os.Args[1:] {
if v == "-h" || v == "--help" {
@ -145,14 +147,12 @@ func (s *service) Run() (err error) {
defer func() {
err = s.opts.Profile.Stop()
if err != nil {
logger.Error(err)
logger.Log(log.ErrorLevel, err)
}
}()
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Starting [service] %s", s.Name())
}
logger.Logf(log.InfoLevel, "Starting [service] %s", s.Name())
if err = s.Start(); err != nil {
return err

View File

@ -5,6 +5,7 @@ import (
"time"
"go-micro.dev/v4/client"
"go-micro.dev/v4/logger"
)
// Options contains configuration for the Store
@ -21,6 +22,8 @@ type Options struct {
Context context.Context
// Client to use for RPC
Client client.Client
// Logger is the underline logger
Logger logger.Logger
}
// Option sets values in Options
@ -63,6 +66,13 @@ func WithClient(c client.Client) Option {
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// ReadOptions configures an individual Read operation
type ReadOptions struct {
Database, Table string

View File

@ -4,6 +4,8 @@ import (
"context"
"crypto/tls"
"time"
"go-micro.dev/v4/logger"
)
// Nodes sets the addresses to use
@ -47,3 +49,10 @@ func WithContext(c context.Context) Option {
o.Context = c
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -6,6 +6,8 @@ import (
"crypto/tls"
"errors"
"time"
"go-micro.dev/v4/logger"
)
var (
@ -41,6 +43,7 @@ type Options struct {
Prefix string
TLSConfig *tls.Config
Context context.Context
Logger logger.Logger
}
type Option func(o *Options)

View File

@ -7,6 +7,7 @@ import (
"time"
"go-micro.dev/v4/codec"
"go-micro.dev/v4/logger"
)
type Options struct {
@ -27,6 +28,8 @@ type Options struct {
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
// Logger is the underline logger
Logger logger.Logger
}
type DialOptions struct {
@ -104,6 +107,13 @@ func WithTimeout(d time.Duration) DialOption {
}
}
// WithLogger sets the underline logger
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// NetListener Set net.Listener for httpTransport
func NetListener(customListener net.Listener) ListenOption {
return func(o *ListenOptions) {

View File

@ -3,13 +3,12 @@ package file
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"os"
"go-micro.dev/v4/client"
"go-micro.dev/v4/logger"
proto "go-micro.dev/v4/util/file/proto"
)
@ -145,7 +144,7 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error {
return err
}
if stat.Type == "Directory" {
return errors.New(fmt.Sprintf("%s is directory.", filename))
return fmt.Errorf("%s is directory", filename)
}
blocks := int(stat.Size / blockSize)
@ -153,7 +152,7 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error {
blocks += 1
}
log.Printf("Download %s in %d blocks\n", filename, blocks-blockId)
logger.Logf(logger.InfoLevel, "Download %s in %d blocks\n", filename, blocks-blockId)
file, err := os.OpenFile(saveFile, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
@ -176,14 +175,14 @@ func (c *fc) DownloadAt(filename, saveFile string, blockId int) error {
}
if i%((blocks-blockId)/100+1) == 0 {
log.Printf("Downloading %s [%d/%d] blocks", filename, i-blockId+1, blocks-blockId)
logger.Logf(logger.InfoLevel, "Downloading %s [%d/%d] blocks", filename, i-blockId+1, blocks-blockId)
}
if rerr == io.EOF {
break
}
}
log.Printf("Download %s completed", filename)
logger.Logf(logger.InfoLevel, "Download %s completed", filename)
c.Close(sessionId)

View File

@ -6,11 +6,12 @@ import (
"path/filepath"
"sync"
"golang.org/x/net/context"
"go-micro.dev/v4/errors"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/server"
proto "go-micro.dev/v4/util/file/proto"
"golang.org/x/net/context"
)
// NewHandler is a handler that can be registered with a micro Server
@ -20,6 +21,7 @@ func NewHandler(readDir string) proto.FileHandler {
session: &session{
files: make(map[int64]*os.File),
},
logger: log.DefaultLogger,
}
}
@ -31,6 +33,7 @@ func RegisterHandler(s server.Server, readDir string) {
type handler struct {
readDir string
session *session
logger log.Logger
}
func (h *handler) Open(ctx context.Context, req *proto.OpenRequest, rsp *proto.OpenResponse) error {
@ -47,14 +50,14 @@ func (h *handler) Open(ctx context.Context, req *proto.OpenRequest, rsp *proto.O
rsp.Id = h.session.Add(file)
rsp.Result = true
logger.Debugf("Open %s, sessionId=%d", req.Filename, rsp.Id)
h.logger.Logf(log.DebugLevel, "Open %s, sessionId=%d", req.Filename, rsp.Id)
return nil
}
func (h *handler) Close(ctx context.Context, req *proto.CloseRequest, rsp *proto.CloseResponse) error {
h.session.Delete(req.Id)
logger.Debugf("Close sessionId=%d", req.Id)
h.logger.Logf(log.DebugLevel, "Close sessionId=%d", req.Id)
return nil
}
@ -73,7 +76,7 @@ func (h *handler) Stat(ctx context.Context, req *proto.StatRequest, rsp *proto.S
}
rsp.LastModified = fi.ModTime().Unix()
logger.Debugf("Stat %s, %#v", req.Filename, rsp)
h.logger.Logf(log.DebugLevel, "Stat %s, %#v", req.Filename, rsp)
return nil
}
@ -97,7 +100,7 @@ func (h *handler) Read(ctx context.Context, req *proto.ReadRequest, rsp *proto.R
rsp.Size = int64(n)
rsp.Data = rsp.Data[:n]
logger.Debugf("Read sessionId=%d, Offset=%d, n=%d", req.Id, req.Offset, rsp.Size)
h.logger.Logf(log.DebugLevel, "Read sessionId=%d, Offset=%d, n=%d", req.Id, req.Offset, rsp.Size)
return nil
}
@ -112,7 +115,7 @@ func (h *handler) Write(ctx context.Context, req *proto.WriteRequest, rsp *proto
return err
}
logger.Debugf("Write sessionId=%d, Offset=%d, n=%d", req.Id, req.Offset)
h.logger.Logf(log.DebugLevel, "Write sessionId=%d, Offset=%d, n=%d", req.Id, req.Offset)
return nil
}

View File

@ -4,10 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/metadata"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/selector"
@ -39,7 +39,7 @@ func WriteInternalServerError(w http.ResponseWriter, err error) {
"error": err.Error(),
})
if err != nil {
log.Println(err)
logger.Log(logger.ErrorLevel, err)
return
}
Write(w, "application/json", 500, string(rawBody))

View File

@ -20,7 +20,7 @@ var (
// path to kubernetes service account token
serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
// ErrReadNamespace is returned when the names could not be read from service account
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
ErrReadNamespace = errors.New("could not read namespace from service account secret")
// DefaultImage is default micro image
DefaultImage = "micro/go-micro"
// DefaultNamespace is the default k8s namespace
@ -228,7 +228,7 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) {
// NewService returns default micro kubernetes service definition
func NewService(name, version, typ, namespace string) *Service {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("kubernetes default service: name: %s, version: %s", name, version)
logger.Logf(logger.TraceLevel, "kubernetes default service: name: %s, version: %s", name, version)
}
Labels := map[string]string{
@ -271,7 +271,7 @@ func NewService(name, version, typ, namespace string) *Service {
// NewService returns default micro kubernetes deployment definition
func NewDeployment(name, version, typ, namespace string) *Deployment {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version)
logger.Logf(logger.TraceLevel, "kubernetes default deployment: name: %s, version: %s", name, version)
}
Labels := map[string]string{

View File

@ -3,7 +3,6 @@ package mdns
import (
"context"
"fmt"
"log"
"net"
"strings"
"sync"
@ -12,6 +11,8 @@ import (
"github.com/miekg/dns"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"go-micro.dev/v4/logger"
)
// ServiceEntry is returned after we query for a service
@ -146,7 +147,7 @@ func Listen(entries chan<- *ServiceEntry, exit chan struct{}) error {
m.SetQuestion(e.Name, dns.TypePTR)
m.RecursionDesired = false
if err := client.sendQuery(m); err != nil {
log.Printf("[ERR] mdns: Failed to query instance %s: %v", e.Name, err)
logger.Logf(logger.ErrorLevel, "[mdns] failed to query instance %s: %v", e.Name, err)
}
}
}
@ -184,7 +185,7 @@ func newClient() (*client, error) {
uconn4, err4 := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IPv4zero, Port: 0})
uconn6, err6 := net.ListenUDP("udp6", &net.UDPAddr{IP: net.IPv6zero, Port: 0})
if err4 != nil && err6 != nil {
log.Printf("[ERR] mdns: Failed to bind to udp port: %v %v", err4, err6)
logger.Logf(logger.ErrorLevel, "[mdns] failed to bind to udp port: %v %v", err4, err6)
}
if uconn4 == nil && uconn6 == nil {
@ -202,7 +203,7 @@ func newClient() (*client, error) {
mconn4, err4 := net.ListenUDP("udp4", mdnsWildcardAddrIPv4)
mconn6, err6 := net.ListenUDP("udp6", mdnsWildcardAddrIPv6)
if err4 != nil && err6 != nil {
log.Printf("[ERR] mdns: Failed to bind to udp port: %v %v", err4, err6)
logger.Logf(logger.ErrorLevel, "[mdns] failed to bind to udp port: %v %v", err4, err6)
}
if mconn4 == nil && mconn6 == nil {
@ -239,7 +240,7 @@ func newClient() (*client, error) {
}
if len(ifaces) == errCount1 && len(ifaces) == errCount2 {
return nil, fmt.Errorf("Failed to join multicast group on all interfaces!")
return nil, fmt.Errorf("failed to join multicast group on all interfaces")
}
c := &client{
@ -375,7 +376,7 @@ func (c *client) query(params *QueryParam) error {
m.SetQuestion(inp.Name, inp.Type)
m.RecursionDesired = false
if err := c.sendQuery(m); err != nil {
log.Printf("[ERR] mdns: Failed to query instance %s: %v", inp.Name, err)
logger.Logf(logger.ErrorLevel, "[mdns] failed to query instance %s: %v", inp.Name, err)
}
}
case <-params.Context.Done():

View File

@ -10,6 +10,7 @@ import "github.com/miekg/dns"
// register only the wrapped instance with the server.
//
// Example usage:
//
// service := &mdns.DNSSDService{
// MDNSService: &mdns.MDNSService{
// Instance: "My Foobar Service",

View File

@ -45,9 +45,12 @@ func Unmarshal(dst interface{}, query string) error {
}
// ToJSON will turn a query string like:
//
// cat=1&bar%5Bone%5D%5Btwo%5D=2&bar[one][red]=112
//
// Into a JSON object with all the data merged as nicely as
// possible. Eg the example above would output:
//
// {"bar":{"one":{"two":2,"red":112}}}
func ToJSON(query string) ([]byte, error) {
var (
@ -65,6 +68,7 @@ func ToJSON(query string) ([]byte, error) {
}
// queryToMap turns something like a[b][c]=4 into
//
// map[string]interface{}{
// "a": map[string]interface{}{
// "b": map[string]interface{}{

View File

@ -8,6 +8,7 @@ import (
"github.com/urfave/cli/v2"
"go-micro.dev/v4"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
)
@ -37,6 +38,7 @@ type Options struct {
Registry registry.Registry
Service micro.Service
Logger logger.Logger
Secure bool
TLSConfig *tls.Config
@ -63,6 +65,7 @@ func newOptions(opts ...Option) Options {
Service: micro.NewService(),
Context: context.TODO(),
Signal: true,
Logger: logger.DefaultLogger,
}
for _, o := range opts {
@ -257,3 +260,10 @@ func HandleSignal(b bool) Option {
o.Signal = b
}
}
// Logger sets the underline logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}

View File

@ -13,8 +13,10 @@ import (
"time"
"github.com/urfave/cli/v2"
"go-micro.dev/v4"
"go-micro.dev/v4/logger"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/registry"
maddr "go-micro.dev/v4/util/addr"
"go-micro.dev/v4/util/backoff"
@ -54,11 +56,13 @@ func (s *service) genSrv() *registry.Service {
var port string
var err error
logger := s.opts.Logger
// default host:port
if len(s.opts.Address) > 0 {
host, port, err = net.SplitHostPort(s.opts.Address)
if err != nil {
logger.Fatal(err)
logger.Log(log.FatalLevel, err)
}
}
@ -68,13 +72,13 @@ func (s *service) genSrv() *registry.Service {
if len(s.opts.Advertise) > 0 {
host, port, err = net.SplitHostPort(s.opts.Advertise)
if err != nil {
logger.Fatal(err)
logger.Log(log.FatalLevel, err)
}
}
addr, err := maddr.Extract(host)
if err != nil {
logger.Fatal(err)
logger.Log(log.FatalLevel, err)
}
if strings.Count(addr, ":") > 0 {
@ -120,6 +124,9 @@ func (s *service) register() error {
if s.srv == nil {
return nil
}
logger := s.opts.Logger
// default to service registry
r := s.opts.Service.Client().Options().Registry
// switch to option if specified
@ -134,9 +141,7 @@ func (s *service) register() error {
// use RegisterCheck func before register
if err := s.opts.RegisterCheck(s.opts.Context); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Server %s-%s register check error: %s", s.opts.Name, s.opts.Id, err)
}
logger.Logf(log.ErrorLevel, "Server %s-%s register check error: %s", s.opts.Name, s.opts.Id, err)
return err
}
@ -195,6 +200,8 @@ func (s *service) start() error {
return err
}
logger := s.opts.Logger
s.opts.Address = l.Addr().String()
srv := s.genSrv()
srv.Endpoints = s.srv.Endpoints
@ -221,9 +228,7 @@ func (s *service) start() error {
if s.static {
_, err := os.Stat(static)
if err == nil {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Enabling static file serving from %s", static)
}
logger.Logf(log.InfoLevel, "Enabling static file serving from %s", static)
s.mux.Handle("/", http.FileServer(http.Dir(static)))
}
}
@ -255,9 +260,7 @@ func (s *service) start() error {
ch <- l.Close()
}()
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Listening on %v", l.Addr().String())
}
logger.Logf(log.InfoLevel, "Listening on %v", l.Addr().String())
return nil
}
@ -279,9 +282,7 @@ func (s *service) stop() error {
s.exit <- ch
s.running = false
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info("Stopping")
}
s.opts.Logger.Log(log.InfoLevel, "Stopping")
for _, fn := range s.opts.AfterStop {
if err := fn(); err != nil {
@ -473,6 +474,7 @@ func (s *service) Run() error {
return err
}
logger := s.opts.Logger
// start the profiler
if s.opts.Service.Options().Profile != nil {
// to view mutex contention
@ -485,7 +487,7 @@ func (s *service) Run() error {
}
defer func() {
if err := s.opts.Service.Options().Profile.Stop(); err != nil {
logger.Error(err)
logger.Log(log.ErrorLevel, err)
}
}()
}
@ -505,14 +507,10 @@ func (s *service) Run() error {
select {
// wait on kill signal
case sig := <-ch:
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Received signal %s", sig)
}
logger.Logf(log.InfoLevel, "Received signal %s", sig)
// wait on context cancel
case <-s.opts.Context.Done():
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Info("Received context shutdown")
}
logger.Log(log.InfoLevel, "Received context shutdown")
}
// exit reg loop

View File

@ -57,14 +57,14 @@ func testFunc() {
defer wg.Done()
err := s.Run()
if err != nil {
logger.Errorf("micro run error: %v", err)
logger.Logf(logger.ErrorLevel, "micro run error: %v", err)
}
}()
go func() {
defer wg.Done()
err := w.Run()
if err != nil {
logger.Errorf("web run error: %v", err)
logger.Logf(logger.ErrorLevel, "web run error: %v", err)
}
}()