1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-22 03:38:41 +02:00
kratos/transport/grpc/client.go

132 lines
3.6 KiB
Go
Raw Normal View History

2021-02-17 17:14:47 +08:00
package grpc
import (
"context"
"time"
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/grpc/resolver/discovery"
2021-05-20 23:30:50 +08:00
// init resolver
_ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"
2021-02-17 17:14:47 +08:00
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
2021-02-17 17:14:47 +08:00
)
// ClientOption is gRPC client option.
type ClientOption func(o *clientOptions)
// WithEndpoint with client endpoint.
func WithEndpoint(endpoint string) ClientOption {
return func(o *clientOptions) {
o.endpoint = endpoint
}
}
// WithTimeout with client timeout.
func WithTimeout(timeout time.Duration) ClientOption {
return func(o *clientOptions) {
o.timeout = timeout
}
}
// WithMiddleware with client middleware.
2021-05-20 23:30:50 +08:00
func WithMiddleware(m ...middleware.Middleware) ClientOption {
2021-02-17 17:14:47 +08:00
return func(o *clientOptions) {
2021-05-20 23:30:50 +08:00
o.middleware = middleware.Chain(m...)
2021-02-17 17:14:47 +08:00
}
}
2021-03-09 22:02:18 +08:00
// WithDiscovery with client discovery.
func WithDiscovery(d registry.Discovery) ClientOption {
2021-02-17 17:14:47 +08:00
return func(o *clientOptions) {
o.discovery = d
2021-02-17 17:14:47 +08:00
}
}
2021-06-01 21:19:18 +08:00
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption {
return func(o *clientOptions) {
o.ints = in
}
}
2021-02-17 17:14:47 +08:00
// WithOptions with gRPC options.
func WithOptions(opts ...grpc.DialOption) ClientOption {
return func(o *clientOptions) {
o.grpcOpts = opts
}
}
// clientOptions is gRPC Client
type clientOptions struct {
endpoint string
timeout time.Duration
middleware middleware.Middleware
discovery registry.Discovery
2021-06-01 21:19:18 +08:00
ints []grpc.UnaryClientInterceptor
2021-02-17 17:14:47 +08:00
grpcOpts []grpc.DialOption
}
// Dial returns a GRPC connection.
func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, false, opts...)
}
// DialInsecure returns an insecure GRPC connection.
func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, true, opts...)
}
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
options := clientOptions{
timeout: 500 * time.Millisecond,
}
for _, o := range opts {
o(&options)
}
2021-06-01 21:19:18 +08:00
var ints = []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
2021-02-17 17:14:47 +08:00
var grpcOpts = []grpc.DialOption{
grpc.WithBalancerName(roundrobin.Name),
2021-06-01 21:19:18 +08:00
grpc.WithChainUnaryInterceptor(ints...),
2021-02-17 17:14:47 +08:00
}
if options.discovery != nil {
grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery)))
2021-02-17 17:14:47 +08:00
}
if insecure {
grpcOpts = append(grpcOpts, grpc.WithInsecure())
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
2021-03-05 22:40:38 +08:00
func unaryClientInterceptor(m middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
2021-02-17 17:14:47 +08:00
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
2021-06-01 12:02:22 +08:00
ctx = transport.NewContext(ctx, transport.Transport{Kind: transport.KindGRPC, Endpoint: cc.Target()})
ctx = NewClientContext(ctx, ClientInfo{FullMethod: method})
2021-03-05 22:40:38 +08:00
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
2021-02-17 17:14:47 +08:00
h := func(ctx context.Context, req interface{}) (interface{}, error) {
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if m != nil {
h = m(h)
}
_, err := h(ctx, req)
return err
}
}