mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-26 03:52:12 +02:00
bc35f20228
* feat(endpoint): add endpoint parser * fix parseTarget ut * fix insecure testing
361 lines
9.5 KiB
Go
361 lines
9.5 KiB
Go
package http
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/go-kratos/kratos/v2/encoding"
|
|
"github.com/go-kratos/kratos/v2/errors"
|
|
"github.com/go-kratos/kratos/v2/internal/endpoint"
|
|
"github.com/go-kratos/kratos/v2/internal/host"
|
|
"github.com/go-kratos/kratos/v2/internal/httputil"
|
|
"github.com/go-kratos/kratos/v2/middleware"
|
|
"github.com/go-kratos/kratos/v2/registry"
|
|
"github.com/go-kratos/kratos/v2/transport"
|
|
"github.com/go-kratos/kratos/v2/transport/http/balancer"
|
|
"github.com/go-kratos/kratos/v2/transport/http/balancer/random"
|
|
)
|
|
|
|
// DecodeErrorFunc is decode error func.
|
|
type DecodeErrorFunc func(ctx context.Context, res *http.Response) error
|
|
|
|
// EncodeRequestFunc is request encode func.
|
|
type EncodeRequestFunc func(ctx context.Context, contentType string, in interface{}) (body []byte, err error)
|
|
|
|
// DecodeResponseFunc is response decode func.
|
|
type DecodeResponseFunc func(ctx context.Context, res *http.Response, out interface{}) error
|
|
|
|
// ClientOption is HTTP client option.
|
|
type ClientOption func(*clientOptions)
|
|
|
|
// Client is an HTTP transport client.
|
|
type clientOptions struct {
|
|
ctx context.Context
|
|
tlsConf *tls.Config
|
|
timeout time.Duration
|
|
endpoint string
|
|
userAgent string
|
|
encoder EncodeRequestFunc
|
|
decoder DecodeResponseFunc
|
|
errorDecoder DecodeErrorFunc
|
|
transport http.RoundTripper
|
|
balancer balancer.Balancer
|
|
discovery registry.Discovery
|
|
middleware []middleware.Middleware
|
|
block bool
|
|
}
|
|
|
|
// WithTransport with client transport.
|
|
func WithTransport(trans http.RoundTripper) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.transport = trans
|
|
}
|
|
}
|
|
|
|
// WithTimeout with client request timeout.
|
|
func WithTimeout(d time.Duration) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.timeout = d
|
|
}
|
|
}
|
|
|
|
// WithUserAgent with client user agent.
|
|
func WithUserAgent(ua string) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.userAgent = ua
|
|
}
|
|
}
|
|
|
|
// WithMiddleware with client middleware.
|
|
func WithMiddleware(m ...middleware.Middleware) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.middleware = m
|
|
}
|
|
}
|
|
|
|
// WithEndpoint with client addr.
|
|
func WithEndpoint(endpoint string) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.endpoint = endpoint
|
|
}
|
|
}
|
|
|
|
// WithRequestEncoder with client request encoder.
|
|
func WithRequestEncoder(encoder EncodeRequestFunc) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.encoder = encoder
|
|
}
|
|
}
|
|
|
|
// WithResponseDecoder with client response decoder.
|
|
func WithResponseDecoder(decoder DecodeResponseFunc) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.decoder = decoder
|
|
}
|
|
}
|
|
|
|
// WithErrorDecoder with client error decoder.
|
|
func WithErrorDecoder(errorDecoder DecodeErrorFunc) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.errorDecoder = errorDecoder
|
|
}
|
|
}
|
|
|
|
// WithDiscovery with client discovery.
|
|
func WithDiscovery(d registry.Discovery) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.discovery = d
|
|
}
|
|
}
|
|
|
|
// WithBalancer with client balancer.
|
|
// Experimental
|
|
// Notice: This type is EXPERIMENTAL and may be changed or removed in a later release.
|
|
func WithBalancer(b balancer.Balancer) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.balancer = b
|
|
}
|
|
}
|
|
|
|
// WithBlock with client block.
|
|
func WithBlock() ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.block = true
|
|
}
|
|
}
|
|
|
|
// WithTLSConfig with tls config.
|
|
func WithTLSConfig(c *tls.Config) ClientOption {
|
|
return func(o *clientOptions) {
|
|
o.tlsConf = c
|
|
}
|
|
}
|
|
|
|
// Client is an HTTP client.
|
|
type Client struct {
|
|
opts clientOptions
|
|
target *Target
|
|
r *resolver
|
|
cc *http.Client
|
|
insecure bool
|
|
}
|
|
|
|
// NewClient returns an HTTP client.
|
|
func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
|
|
options := clientOptions{
|
|
ctx: ctx,
|
|
timeout: 2000 * time.Millisecond,
|
|
encoder: DefaultRequestEncoder,
|
|
decoder: DefaultResponseDecoder,
|
|
errorDecoder: DefaultErrorDecoder,
|
|
transport: http.DefaultTransport,
|
|
balancer: random.New(),
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
if options.tlsConf != nil {
|
|
if tr, ok := options.transport.(*http.Transport); ok {
|
|
tr.TLSClientConfig = options.tlsConf
|
|
}
|
|
}
|
|
insecure := options.tlsConf == nil
|
|
target, err := parseTarget(options.endpoint, insecure)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var r *resolver
|
|
if options.discovery != nil {
|
|
if target.Scheme == "discovery" {
|
|
if r, err = newResolver(ctx, options.discovery, target, options.balancer, options.block, insecure); err != nil {
|
|
return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint)
|
|
}
|
|
} else if _, _, err := host.ExtractHostPort(options.endpoint); err != nil {
|
|
return nil, fmt.Errorf("[http client] invalid endpoint format: %v", options.endpoint)
|
|
}
|
|
}
|
|
return &Client{
|
|
opts: options,
|
|
target: target,
|
|
insecure: insecure,
|
|
r: r,
|
|
cc: &http.Client{
|
|
Timeout: options.timeout,
|
|
Transport: options.transport,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Invoke makes an rpc call procedure for remote service.
|
|
func (client *Client) Invoke(ctx context.Context, method, path string, args interface{}, reply interface{}, opts ...CallOption) error {
|
|
var (
|
|
contentType string
|
|
body io.Reader
|
|
)
|
|
c := defaultCallInfo(path)
|
|
for _, o := range opts {
|
|
if err := o.before(&c); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if args != nil {
|
|
data, err := client.opts.encoder(ctx, c.contentType, args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
contentType = c.contentType
|
|
body = bytes.NewReader(data)
|
|
}
|
|
url := fmt.Sprintf("%s://%s%s", client.target.Scheme, client.target.Authority, path)
|
|
req, err := http.NewRequest(method, url, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if contentType != "" {
|
|
req.Header.Set("Content-Type", c.contentType)
|
|
}
|
|
if client.opts.userAgent != "" {
|
|
req.Header.Set("User-Agent", client.opts.userAgent)
|
|
}
|
|
ctx = transport.NewClientContext(ctx, &Transport{
|
|
endpoint: client.opts.endpoint,
|
|
reqHeader: headerCarrier(req.Header),
|
|
operation: c.operation,
|
|
request: req,
|
|
pathTemplate: c.pathTemplate,
|
|
})
|
|
return client.invoke(ctx, req, args, reply, c, opts...)
|
|
}
|
|
|
|
func (client *Client) invoke(ctx context.Context, req *http.Request, args interface{}, reply interface{}, c callInfo, opts ...CallOption) error {
|
|
h := func(ctx context.Context, in interface{}) (interface{}, error) {
|
|
var done func(context.Context, balancer.DoneInfo)
|
|
if client.r != nil {
|
|
var (
|
|
err error
|
|
node *registry.ServiceInstance
|
|
)
|
|
if node, done, err = client.opts.balancer.Pick(ctx); err != nil {
|
|
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
|
|
}
|
|
endpoint, err := endpoint.ParseEndpoint(node.Endpoints, "http", !client.insecure)
|
|
if err != nil {
|
|
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
|
|
}
|
|
if client.insecure {
|
|
req.URL.Scheme = "http"
|
|
} else {
|
|
req.URL.Scheme = "https"
|
|
}
|
|
req.URL.Host = endpoint
|
|
req.Host = endpoint
|
|
}
|
|
res, err := client.do(ctx, req, c)
|
|
if done != nil {
|
|
done(ctx, balancer.DoneInfo{Err: err})
|
|
}
|
|
if res != nil {
|
|
cs := csAttempt{res: res}
|
|
for _, o := range opts {
|
|
o.after(&c, &cs)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer res.Body.Close()
|
|
if err := client.opts.decoder(ctx, res, reply); err != nil {
|
|
return nil, err
|
|
}
|
|
return reply, nil
|
|
}
|
|
if len(client.opts.middleware) > 0 {
|
|
h = middleware.Chain(client.opts.middleware...)(h)
|
|
}
|
|
_, err := h(ctx, args)
|
|
return err
|
|
}
|
|
|
|
// Do send an HTTP request and decodes the body of response into target.
|
|
// returns an error (of type *Error) if the response status code is not 2xx.
|
|
func (client *Client) Do(req *http.Request, opts ...CallOption) (*http.Response, error) {
|
|
c := defaultCallInfo(req.URL.Path)
|
|
for _, o := range opts {
|
|
if err := o.before(&c); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return client.do(req.Context(), req, c)
|
|
}
|
|
|
|
func (client *Client) do(ctx context.Context, req *http.Request, c callInfo) (*http.Response, error) {
|
|
resp, err := client.cc.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := client.opts.errorDecoder(ctx, resp); err != nil {
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
// Close tears down the Transport and all underlying connections.
|
|
func (client *Client) Close() error {
|
|
if client.r != nil {
|
|
return client.r.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DefaultRequestEncoder is an HTTP request encoder.
|
|
func DefaultRequestEncoder(ctx context.Context, contentType string, in interface{}) ([]byte, error) {
|
|
name := httputil.ContentSubtype(contentType)
|
|
body, err := encoding.GetCodec(name).Marshal(in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return body, err
|
|
}
|
|
|
|
// DefaultResponseDecoder is an HTTP response decoder.
|
|
func DefaultResponseDecoder(ctx context.Context, res *http.Response, v interface{}) error {
|
|
defer res.Body.Close()
|
|
data, err := ioutil.ReadAll(res.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return CodecForResponse(res).Unmarshal(data, v)
|
|
}
|
|
|
|
// DefaultErrorDecoder is an HTTP error decoder.
|
|
func DefaultErrorDecoder(ctx context.Context, res *http.Response) error {
|
|
if res.StatusCode >= 200 && res.StatusCode <= 299 {
|
|
return nil
|
|
}
|
|
defer res.Body.Close()
|
|
data, err := ioutil.ReadAll(res.Body)
|
|
if err == nil {
|
|
e := new(errors.Error)
|
|
if err = CodecForResponse(res).Unmarshal(data, e); err == nil {
|
|
e.Code = int32(res.StatusCode)
|
|
return e
|
|
}
|
|
}
|
|
return errors.Errorf(res.StatusCode, errors.UnknownReason, err.Error())
|
|
}
|
|
|
|
// CodecForResponse get encoding.Codec via http.Response
|
|
func CodecForResponse(r *http.Response) encoding.Codec {
|
|
codec := encoding.GetCodec(httputil.ContentSubtype(r.Header.Get("Content-Type")))
|
|
if codec != nil {
|
|
return codec
|
|
}
|
|
return encoding.GetCodec("json")
|
|
}
|