1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

fix service default logger (#2171)

* Update http.go

Exit before deregister is executed

* Create http.go

Exit before deregister is executed

* Solve the problem that the resources have not been fully released due to early exit

* Optimize some code

* Optimize some code

* Optimize some code

* fix service default logger
This commit is contained in:
JeffreyBool 2021-05-23 15:38:20 +08:00 committed by GitHub
parent f48911d2c3
commit acc3f5479f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 72 deletions

View File

@ -160,12 +160,12 @@ func (l *defaultLogger) Logf(level Level, format string, v ...interface{}) {
fmt.Printf("%s %s %v\n", t, metadata, rec.Message)
}
func (n *defaultLogger) Options() Options {
func (l *defaultLogger) Options() Options {
// not guard against options Context values
n.RLock()
opts := n.opts
opts.Fields = copyFields(n.opts.Fields)
n.RUnlock()
l.RLock()
opts := l.opts
opts.Fields = copyFields(l.opts.Fields)
l.RUnlock()
return opts
}

View File

@ -42,26 +42,6 @@ type Function interface {
Subscribe(topic string, v interface{}) error
}
/*
// Type Event is a future type for acting on asynchronous events
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
// Subscribe to the event
Subscribe(ctx context.Context, v in
}
// Resource is a future type for defining dependencies
type Resource interface {
// Name of the resource
Name() string
// Type of resource
Type() string
// Method of creation
Create() error
}
*/
// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
@ -73,10 +53,6 @@ type Publisher = Event
type Option func(*Options)
var (
HeaderPrefix = "Micro-"
)
// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
return newService(opts...)

View File

@ -20,6 +20,7 @@ import (
"github.com/asim/go-micro/v3/codec"
merrors "github.com/asim/go-micro/v3/errors"
"github.com/asim/go-micro/v3/logger"
)
var (
@ -140,7 +141,7 @@ func prepareMethod(method reflect.Method) *methodType {
replyType = mtype.In(3)
contextType = mtype.In(1)
default:
log.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
return nil
}
@ -148,7 +149,7 @@ func prepareMethod(method reflect.Method) *methodType {
// check stream type
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) {
log.Errorf("%v argument does not implement Stream interface: %v", mname, argType)
logger.Errorf("%v argument does not implement Stream interface: %v", mname, argType)
return nil
}
} else {
@ -156,30 +157,30 @@ func prepareMethod(method reflect.Method) *methodType {
// First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) {
log.Errorf("%v argument type not exported: %v", mname, argType)
logger.Errorf("%v argument type not exported: %v", mname, argType)
return nil
}
if replyType.Kind() != reflect.Ptr {
log.Errorf("method %v reply type not a pointer: %v", mname, replyType)
logger.Errorf("method %v reply type not a pointer: %v", mname, replyType)
return nil
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
log.Errorf("method %v reply type not exported: %v", mname, replyType)
logger.Errorf("method %v reply type not exported: %v", mname, replyType)
return nil
}
}
// Method needs one out.
if mtype.NumOut() != 1 {
log.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
return nil
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
log.Errorf("method %v returns %v not error", mname, returnType.String())
logger.Errorf("method %v returns %v not error", mname, returnType.String())
return nil
}
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
@ -508,8 +509,8 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro
defer func() {
// recover any panics
if r := recover(); r != nil {
log.Errorf("panic recovered: %v", r)
log.Error(string(debug.Stack()))
logger.Errorf("panic recovered: %v", r)
logger.Error(string(debug.Stack()))
err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r)
}
}()

View File

@ -161,9 +161,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics
if r := recover(); r != nil {
if logger.V(logger.ErrorLevel, log) {
log.Error("panic recovered: ", r)
log.Error(string(debug.Stack()))
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
}
}()
@ -383,9 +383,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics for outbound process
if r := recover(); r != nil {
if logger.V(logger.ErrorLevel, log) {
log.Error("panic recovered: ", r)
log.Error(string(debug.Stack()))
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
}
}()
@ -414,8 +414,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// recover any panics for call handler
if r := recover(); r != nil {
log.Error("panic recovered: ", r)
log.Error(string(debug.Stack()))
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
}()
@ -434,7 +434,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// could not write error response
if writeError != nil && !alreadyClosed {
log.Debugf("rpc: unable to write error response: %v", writeError)
logger.Debugf("rpc: unable to write error response: %v", writeError)
}
}
}(id, psock)
@ -651,7 +651,7 @@ func (s *rpcServer) Register() error {
if !registered {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
}
@ -703,7 +703,7 @@ func (s *rpcServer) Register() error {
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Subscribing to topic: %s", sub.Topic())
logger.Infof("Subscribing to topic: %s", sub.Topic())
}
s.subscribers[sb] = []broker.Subscriber{sub}
}
@ -764,7 +764,7 @@ func (s *rpcServer) Deregister() error {
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
logger.Infof("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
}
if err := config.Registry.Deregister(service); err != nil {
return err
@ -789,7 +789,7 @@ func (s *rpcServer) Deregister() error {
for sb, subs := range s.subscribers {
for _, sub := range subs {
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Unsubscribing %s from topic: %s", node.Id, sub.Topic())
logger.Infof("Unsubscribing %s from topic: %s", node.Id, sub.Topic())
}
sub.Unsubscribe()
}
@ -817,7 +817,7 @@ func (s *rpcServer) Start() error {
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
logger.Infof("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
}
// swap address
@ -831,25 +831,25 @@ func (s *rpcServer) Start() error {
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Broker [%s] connect error: %v", bname, err)
logger.Errorf("Broker [%s] connect error: %v", bname, err)
}
return err
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Broker [%s] Connected to %s", bname, config.Broker.Address())
logger.Infof("Broker [%s] Connected to %s", bname, config.Broker.Address())
}
// use RegisterCheck func before register
if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
} else {
// announce self to the world
if err = s.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
}
@ -872,7 +872,7 @@ func (s *rpcServer) Start() error {
default:
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Accept error: %v", err)
logger.Errorf("Accept error: %v", err)
}
time.Sleep(time.Second)
continue
@ -907,23 +907,23 @@ func (s *rpcServer) Start() error {
rerr := s.opts.RegisterCheck(s.opts.Context)
if rerr != nil && registered {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
}
// deregister self in case of error
if err := s.Deregister(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
} else if rerr != nil && !registered {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
continue
}
if err := s.Register(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
@ -941,7 +941,7 @@ func (s *rpcServer) Start() error {
// deregister self
if err := s.Deregister(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
}
@ -959,12 +959,12 @@ func (s *rpcServer) Start() error {
ch <- ts.Close()
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
log.Infof("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
logger.Infof("Broker [%s] Disconnected from %s", bname, config.Broker.Address())
}
// disconnect the broker
if err := config.Broker.Disconnect(); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
log.Errorf("Broker [%s] Disconnect error: %v", bname, err)
logger.Errorf("Broker [%s] Disconnect error: %v", bname, err)
}
}

View File

@ -149,7 +149,6 @@ var (
// NewServer creates a new server
NewServer func(...Option) Server = newRpcServer
log = logger.NewHelper(logger.DefaultLogger).WithFields(map[string]interface{}{"service": "server"})
)
// DefaultOptions returns config options for the default service
@ -212,8 +211,8 @@ func Run() error {
ch := make(chan os.Signal, 1)
signal.Notify(ch, signalutil.Shutdown()...)
if logger.V(logger.InfoLevel, log) {
log.Infof("Received signal %s", <-ch)
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Received signal %s", <-ch)
}
return Stop()
}
@ -221,16 +220,16 @@ func Run() error {
// Start starts the default server
func Start() error {
config := DefaultServer.Options()
if logger.V(logger.InfoLevel, log) {
log.Infof("Starting server %s id %s", config.Name, config.Id)
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Starting server %s id %s", config.Name, config.Id)
}
return DefaultServer.Start()
}
// Stop stops the default server
func Stop() error {
if logger.V(logger.InfoLevel, log) {
log.Infof("Stopping server")
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Stopping server")
}
return DefaultServer.Stop()
}