mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-10 00:29:01 +02:00
af3aaf073c
1. bm perf default use engine port 2. add grpc gzip import 3. fix discovery client register addrs bug
342 lines
11 KiB
Go
342 lines
11 KiB
Go
package warden
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bilibili/kratos/pkg/conf/dsn"
|
|
"github.com/bilibili/kratos/pkg/log"
|
|
nmd "github.com/bilibili/kratos/pkg/net/metadata"
|
|
"github.com/bilibili/kratos/pkg/net/rpc/warden/ratelimiter"
|
|
"github.com/bilibili/kratos/pkg/net/trace"
|
|
xtime "github.com/bilibili/kratos/pkg/time"
|
|
|
|
//this package is for json format response
|
|
_ "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/encoding/json"
|
|
"github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status"
|
|
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
_ "google.golang.org/grpc/encoding/gzip" // NOTE: use grpc gzip by header grpc-accept-encoding
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
var (
|
|
_grpcDSN string
|
|
_defaultSerConf = &ServerConfig{
|
|
Network: "tcp",
|
|
Addr: "0.0.0.0:9000",
|
|
Timeout: xtime.Duration(time.Second),
|
|
IdleTimeout: xtime.Duration(time.Second * 60),
|
|
MaxLifeTime: xtime.Duration(time.Hour * 2),
|
|
ForceCloseWait: xtime.Duration(time.Second * 20),
|
|
KeepAliveInterval: xtime.Duration(time.Second * 60),
|
|
KeepAliveTimeout: xtime.Duration(time.Second * 20),
|
|
}
|
|
_abortIndex int8 = math.MaxInt8 / 2
|
|
)
|
|
|
|
// ServerConfig is rpc server conf.
|
|
type ServerConfig struct {
|
|
// Network is grpc listen network,default value is tcp
|
|
Network string `dsn:"network"`
|
|
// Addr is grpc listen addr,default value is 0.0.0.0:9000
|
|
Addr string `dsn:"address"`
|
|
// Timeout is context timeout for per rpc call.
|
|
Timeout xtime.Duration `dsn:"query.timeout"`
|
|
// IdleTimeout is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
|
|
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
|
|
IdleTimeout xtime.Duration `dsn:"query.idleTimeout"`
|
|
// MaxLifeTime is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
|
|
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
|
|
MaxLifeTime xtime.Duration `dsn:"query.maxLife"`
|
|
// ForceCloseWait is an additive period after MaxLifeTime after which the connection will be forcibly closed.
|
|
ForceCloseWait xtime.Duration `dsn:"query.closeWait"`
|
|
// KeepAliveInterval is after a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
|
|
KeepAliveInterval xtime.Duration `dsn:"query.keepaliveInterval"`
|
|
// KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
|
|
// the connection is closed.
|
|
KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"`
|
|
// LogFlag to control log behaviour. e.g. LogFlag: warden.LogFlagDisableLog.
|
|
// Disable: 1 DisableArgs: 2 DisableInfo: 4
|
|
LogFlag int8 `dsn:"query.logFlag"`
|
|
}
|
|
|
|
// Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors.
|
|
// Create an instance of Server, by using NewServer().
|
|
type Server struct {
|
|
conf *ServerConfig
|
|
mutex sync.RWMutex
|
|
|
|
server *grpc.Server
|
|
handlers []grpc.UnaryServerInterceptor
|
|
}
|
|
|
|
// handle return a new unary server interceptor for OpenTracing\Logging\LinkTimeout.
|
|
func (s *Server) handle() grpc.UnaryServerInterceptor {
|
|
return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
|
var (
|
|
cancel func()
|
|
addr string
|
|
)
|
|
s.mutex.RLock()
|
|
conf := s.conf
|
|
s.mutex.RUnlock()
|
|
// get derived timeout from grpc context,
|
|
// compare with the warden configured,
|
|
// and use the minimum one
|
|
timeout := time.Duration(conf.Timeout)
|
|
if dl, ok := ctx.Deadline(); ok {
|
|
ctimeout := time.Until(dl)
|
|
if ctimeout-time.Millisecond*20 > 0 {
|
|
ctimeout = ctimeout - time.Millisecond*20
|
|
}
|
|
if timeout > ctimeout {
|
|
timeout = ctimeout
|
|
}
|
|
}
|
|
ctx, cancel = context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
// get grpc metadata(trace & remote_ip & color)
|
|
var t trace.Trace
|
|
cmd := nmd.MD{}
|
|
if gmd, ok := metadata.FromIncomingContext(ctx); ok {
|
|
for key, vals := range gmd {
|
|
if nmd.IsIncomingKey(key) {
|
|
cmd[key] = vals[0]
|
|
}
|
|
}
|
|
}
|
|
if t == nil {
|
|
t = trace.New(args.FullMethod)
|
|
} else {
|
|
t.SetTitle(args.FullMethod)
|
|
}
|
|
|
|
if pr, ok := peer.FromContext(ctx); ok {
|
|
addr = pr.Addr.String()
|
|
t.SetTag(trace.String(trace.TagAddress, addr))
|
|
}
|
|
defer t.Finish(&err)
|
|
|
|
// use common meta data context instead of grpc context
|
|
ctx = nmd.NewContext(ctx, cmd)
|
|
ctx = trace.NewContext(ctx, t)
|
|
|
|
resp, err = handler(ctx, req)
|
|
return resp, status.FromError(err).Err()
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
addFlag(flag.CommandLine)
|
|
}
|
|
|
|
func addFlag(fs *flag.FlagSet) {
|
|
v := os.Getenv("GRPC")
|
|
if v == "" {
|
|
v = "tcp://0.0.0.0:9000/?timeout=1s&idle_timeout=60s"
|
|
}
|
|
fs.StringVar(&_grpcDSN, "grpc", v, "listen grpc dsn, or use GRPC env variable.")
|
|
fs.Var(&_grpcTarget, "grpc.target", "usage: -grpc.target=seq.service=127.0.0.1:9000 -grpc.target=fav.service=192.168.10.1:9000")
|
|
}
|
|
|
|
func parseDSN(rawdsn string) *ServerConfig {
|
|
conf := new(ServerConfig)
|
|
d, err := dsn.Parse(rawdsn)
|
|
if err != nil {
|
|
panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn)))
|
|
}
|
|
if _, err = d.Bind(conf); err != nil {
|
|
panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn)))
|
|
}
|
|
return conf
|
|
}
|
|
|
|
// NewServer returns a new blank Server instance with a default server interceptor.
|
|
func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) {
|
|
if conf == nil {
|
|
if !flag.Parsed() {
|
|
fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n")
|
|
}
|
|
conf = parseDSN(_grpcDSN)
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.\n")
|
|
}
|
|
s = new(Server)
|
|
if err := s.SetConfig(conf); err != nil {
|
|
panic(errors.Errorf("warden: set config failed!err: %s", err.Error()))
|
|
}
|
|
keepParam := grpc.KeepaliveParams(keepalive.ServerParameters{
|
|
MaxConnectionIdle: time.Duration(s.conf.IdleTimeout),
|
|
MaxConnectionAgeGrace: time.Duration(s.conf.ForceCloseWait),
|
|
Time: time.Duration(s.conf.KeepAliveInterval),
|
|
Timeout: time.Duration(s.conf.KeepAliveTimeout),
|
|
MaxConnectionAge: time.Duration(s.conf.MaxLifeTime),
|
|
})
|
|
opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor))
|
|
s.server = grpc.NewServer(opt...)
|
|
s.Use(s.recovery(), s.handle(), serverLogging(conf.LogFlag), s.stats(), s.validate())
|
|
s.Use(ratelimiter.New(nil).Limit())
|
|
return
|
|
}
|
|
|
|
// SetConfig hot reloads server config
|
|
func (s *Server) SetConfig(conf *ServerConfig) (err error) {
|
|
if conf == nil {
|
|
conf = _defaultSerConf
|
|
}
|
|
if conf.Timeout <= 0 {
|
|
conf.Timeout = xtime.Duration(time.Second)
|
|
}
|
|
if conf.IdleTimeout <= 0 {
|
|
conf.IdleTimeout = xtime.Duration(time.Second * 60)
|
|
}
|
|
if conf.MaxLifeTime <= 0 {
|
|
conf.MaxLifeTime = xtime.Duration(time.Hour * 2)
|
|
}
|
|
if conf.ForceCloseWait <= 0 {
|
|
conf.ForceCloseWait = xtime.Duration(time.Second * 20)
|
|
}
|
|
if conf.KeepAliveInterval <= 0 {
|
|
conf.KeepAliveInterval = xtime.Duration(time.Second * 60)
|
|
}
|
|
if conf.KeepAliveTimeout <= 0 {
|
|
conf.KeepAliveTimeout = xtime.Duration(time.Second * 20)
|
|
}
|
|
if conf.Addr == "" {
|
|
conf.Addr = "0.0.0.0:9000"
|
|
}
|
|
if conf.Network == "" {
|
|
conf.Network = "tcp"
|
|
}
|
|
s.mutex.Lock()
|
|
s.conf = conf
|
|
s.mutex.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// interceptor is a single interceptor out of a chain of many interceptors.
|
|
// Execution is done in left-to-right order, including passing of context.
|
|
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
|
|
// will see context changes of one and two.
|
|
func (s *Server) interceptor(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
var (
|
|
i int
|
|
chain grpc.UnaryHandler
|
|
)
|
|
|
|
n := len(s.handlers)
|
|
if n == 0 {
|
|
return handler(ctx, req)
|
|
}
|
|
|
|
chain = func(ic context.Context, ir interface{}) (interface{}, error) {
|
|
if i == n-1 {
|
|
return handler(ic, ir)
|
|
}
|
|
i++
|
|
return s.handlers[i](ic, ir, args, chain)
|
|
}
|
|
|
|
return s.handlers[0](ctx, req, args, chain)
|
|
}
|
|
|
|
// Server return the grpc server for registering service.
|
|
func (s *Server) Server() *grpc.Server {
|
|
return s.server
|
|
}
|
|
|
|
// Use attachs a global inteceptor to the server.
|
|
// For example, this is the right place for a rate limiter or error management inteceptor.
|
|
func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server {
|
|
finalSize := len(s.handlers) + len(handlers)
|
|
if finalSize >= int(_abortIndex) {
|
|
panic("warden: server use too many handlers")
|
|
}
|
|
mergedHandlers := make([]grpc.UnaryServerInterceptor, finalSize)
|
|
copy(mergedHandlers, s.handlers)
|
|
copy(mergedHandlers[len(s.handlers):], handlers)
|
|
s.handlers = mergedHandlers
|
|
return s
|
|
}
|
|
|
|
// Run create a tcp listener and start goroutine for serving each incoming request.
|
|
// Run will return a non-nil error unless Stop or GracefulStop is called.
|
|
func (s *Server) Run(addr string) error {
|
|
lis, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
err = errors.WithStack(err)
|
|
log.Error("failed to listen: %v", err)
|
|
return err
|
|
}
|
|
reflection.Register(s.server)
|
|
return s.Serve(lis)
|
|
}
|
|
|
|
// RunUnix create a unix listener and start goroutine for serving each incoming request.
|
|
// RunUnix will return a non-nil error unless Stop or GracefulStop is called.
|
|
func (s *Server) RunUnix(file string) error {
|
|
lis, err := net.Listen("unix", file)
|
|
if err != nil {
|
|
err = errors.WithStack(err)
|
|
log.Error("failed to listen: %v", err)
|
|
return err
|
|
}
|
|
reflection.Register(s.server)
|
|
return s.Serve(lis)
|
|
}
|
|
|
|
// Start create a new goroutine run server with configured listen addr
|
|
// will panic if any error happend
|
|
// return server itself
|
|
func (s *Server) Start() (*Server, error) {
|
|
lis, err := net.Listen(s.conf.Network, s.conf.Addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.Info("warden: start grpc listen addr: %v", lis.Addr())
|
|
reflection.Register(s.server)
|
|
go func() {
|
|
if err := s.Serve(lis); err != nil {
|
|
panic(err)
|
|
}
|
|
}()
|
|
return s, nil
|
|
}
|
|
|
|
// Serve accepts incoming connections on the listener lis, creating a new
|
|
// ServerTransport and service goroutine for each.
|
|
// Serve will return a non-nil error unless Stop or GracefulStop is called.
|
|
func (s *Server) Serve(lis net.Listener) error {
|
|
return s.server.Serve(lis)
|
|
}
|
|
|
|
// Shutdown stops the server gracefully. It stops the server from
|
|
// accepting new connections and RPCs and blocks until all the pending RPCs are
|
|
// finished or the context deadline is reached.
|
|
func (s *Server) Shutdown(ctx context.Context) (err error) {
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
s.server.GracefulStop()
|
|
close(ch)
|
|
}()
|
|
select {
|
|
case <-ctx.Done():
|
|
s.server.Stop()
|
|
err = ctx.Err()
|
|
case <-ch:
|
|
}
|
|
return
|
|
}
|