1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-26 03:52:12 +02:00
kratos/transport/grpc/server.go

193 lines
4.6 KiB
Go
Raw Normal View History

2021-02-17 17:14:47 +08:00
package grpc
import (
"context"
"net"
"net/url"
"sync"
2021-02-17 17:14:47 +08:00
"time"
2021-05-11 16:06:07 +08:00
"github.com/go-kratos/kratos/v2/api/metadata"
ic "github.com/go-kratos/kratos/v2/internal/context"
2021-02-17 17:14:47 +08:00
"github.com/go-kratos/kratos/v2/internal/host"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
2021-05-17 20:18:53 +08:00
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
2021-02-17 17:14:47 +08:00
)
var _ transport.Server = (*Server)(nil)
2021-05-28 15:30:55 +08:00
var _ transport.Endpointer = (*Server)(nil)
2021-02-17 17:14:47 +08:00
// ServerOption is gRPC server option.
type ServerOption func(o *Server)
// Network with server network.
func Network(network string) ServerOption {
return func(s *Server) {
s.network = network
}
}
// Address with server address.
func Address(addr string) ServerOption {
return func(s *Server) {
s.address = addr
}
}
// Timeout with server timeout.
func Timeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.timeout = timeout
}
}
// Logger with server logger.
func Logger(logger log.Logger) ServerOption {
return func(s *Server) {
s.log = log.NewHelper(logger)
2021-02-17 17:14:47 +08:00
}
}
// Middleware with server middleware.
2021-05-20 23:30:50 +08:00
func Middleware(m ...middleware.Middleware) ServerOption {
2021-02-17 17:14:47 +08:00
return func(s *Server) {
2021-05-20 23:30:50 +08:00
s.middleware = middleware.Chain(m...)
2021-02-17 17:14:47 +08:00
}
}
2021-06-01 21:19:18 +08:00
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the server.
2021-06-01 19:59:35 +08:00
func UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption {
return func(s *Server) {
s.ints = in
}
}
2021-02-17 17:14:47 +08:00
// Options with grpc options.
func Options(opts ...grpc.ServerOption) ServerOption {
return func(s *Server) {
s.grpcOpts = opts
}
}
// Server is a gRPC server wrapper.
type Server struct {
*grpc.Server
ctx context.Context
2021-02-17 17:14:47 +08:00
lis net.Listener
once sync.Once
err error
2021-02-17 17:14:47 +08:00
network string
address string
endpoint *url.URL
2021-02-17 17:14:47 +08:00
timeout time.Duration
log *log.Helper
middleware middleware.Middleware
2021-06-01 19:59:35 +08:00
ints []grpc.UnaryServerInterceptor
2021-02-17 17:14:47 +08:00
grpcOpts []grpc.ServerOption
health *health.Server
metadata *metadata.Server
2021-02-17 17:14:47 +08:00
}
// NewServer creates a gRPC server by options.
func NewServer(opts ...ServerOption) *Server {
srv := &Server{
network: "tcp",
address: ":0",
timeout: 1 * time.Second,
2021-06-01 22:12:46 +08:00
health: health.NewServer(),
log: log.NewHelper(log.DefaultLogger),
2021-02-17 17:14:47 +08:00
}
for _, o := range opts {
o(srv)
}
2021-06-01 19:59:35 +08:00
var ints = []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(),
}
if len(srv.ints) > 0 {
ints = append(ints, srv.ints...)
}
2021-02-17 17:14:47 +08:00
var grpcOpts = []grpc.ServerOption{
2021-06-01 19:59:35 +08:00
grpc.ChainUnaryInterceptor(ints...),
2021-02-17 17:14:47 +08:00
}
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
srv.Server = grpc.NewServer(grpcOpts...)
srv.metadata = metadata.NewServer(srv.Server)
2021-05-17 20:18:53 +08:00
// internal register
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
metadata.RegisterMetadataServer(srv.Server, srv.metadata)
reflection.Register(srv.Server)
2021-02-17 17:14:47 +08:00
return srv
}
// Endpoint return a real address to registry endpoint.
// examples:
// grpc://127.0.0.1:9000?isSecure=false
func (s *Server) Endpoint() (*url.URL, error) {
s.once.Do(func() {
2021-05-28 15:08:25 +08:00
lis, err := net.Listen(s.network, s.address)
if err != nil {
s.err = err
return
}
addr, err := host.Extract(s.address, s.lis)
if err != nil {
lis.Close()
s.err = err
return
2021-05-28 15:08:25 +08:00
}
s.lis = lis
s.endpoint = &url.URL{Scheme: "grpc", Host: addr}
})
if s.err != nil {
return nil, s.err
2021-05-28 15:08:25 +08:00
}
return s.endpoint, nil
2021-02-17 17:14:47 +08:00
}
// Start start the gRPC server.
func (s *Server) Start(ctx context.Context) error {
if _, err := s.Endpoint(); err != nil {
return err
2021-02-17 17:14:47 +08:00
}
s.ctx = ctx
2021-05-28 15:08:25 +08:00
s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String())
s.health.Resume()
2021-05-28 15:08:25 +08:00
return s.Serve(s.lis)
2021-02-17 17:14:47 +08:00
}
// Stop stop the gRPC server.
func (s *Server) Stop(ctx context.Context) error {
2021-02-17 17:14:47 +08:00
s.GracefulStop()
s.health.Shutdown()
2021-02-17 17:14:47 +08:00
s.log.Info("[gRPC] server stopping")
return nil
}
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
2021-02-17 17:14:47 +08:00
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
2021-06-01 12:02:22 +08:00
ctx = transport.NewContext(ctx, transport.Transport{Kind: transport.KindGRPC, Endpoint: s.endpoint.String()})
ctx = NewServerContext(ctx, ServerInfo{Server: info.Server, FullMethod: info.FullMethod})
if s.timeout > 0 {
2021-03-05 22:40:38 +08:00
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.timeout)
2021-03-05 22:40:38 +08:00
defer cancel()
}
2021-02-17 17:14:47 +08:00
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return handler(ctx, req)
}
if s.middleware != nil {
h = s.middleware(h)
2021-02-17 17:14:47 +08:00
}
return h(ctx, req)
}
}