1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-02-05 13:15:11 +02:00

fix: polaris get service name (#2615)

This commit is contained in:
包子 2023-02-20 22:56:48 +08:00 committed by GitHub
parent 0a076443cb
commit 27eadd83b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 63 deletions

View File

@ -16,6 +16,7 @@ type Polaris struct {
registry polaris.ProviderAPI registry polaris.ProviderAPI
discovery polaris.ConsumerAPI discovery polaris.ConsumerAPI
namespace string namespace string
service string
} }
// Option is polaris option. // Option is polaris option.
@ -28,6 +29,13 @@ func WithNamespace(ns string) Option {
} }
} }
// WithService set the current service name
func WithService(service string) Option {
return func(o *Polaris) {
o.service = service
}
}
// New polaris Service governance. // New polaris Service governance.
func New(sdk api.SDKContext, opts ...Option) Polaris { func New(sdk api.SDKContext, opts ...Option) Polaris {
op := Polaris{ op := Polaris{
@ -78,14 +86,15 @@ func (p *Polaris) Registry(opts ...RegistryOption) (r *Registry) {
} }
} }
func (p *Polaris) Limiter(opts ...LimiterOption) (r *Limiter) { func (p *Polaris) Limiter(opts ...LimiterOption) (r Limiter) {
op := limiterOptions{ op := limiterOptions{
namespace: p.namespace, namespace: p.namespace,
service: p.service,
} }
for _, option := range opts { for _, option := range opts {
option(&op) option(&op)
} }
return &Limiter{ return Limiter{
limitAPI: p.limit, limitAPI: p.limit,
opts: op, opts: op,
} }

View File

@ -19,7 +19,7 @@ var (
ErrLimitExceed = errors.New(429, "RATELIMIT", "service unavailable due to rate limit exceeded") ErrLimitExceed = errors.New(429, "RATELIMIT", "service unavailable due to rate limit exceeded")
) )
// Ratelimit ratelimiter middleware // Ratelimit Request rate limit middleware
func Ratelimit(l Limiter) middleware.Middleware { func Ratelimit(l Limiter) middleware.Middleware {
return func(handler middleware.Handler) middleware.Handler { return func(handler middleware.Handler) middleware.Handler {
return func(ctx context.Context, req interface{}) (reply interface{}, err error) { return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
@ -47,7 +47,7 @@ func Ratelimit(l Limiter) middleware.Middleware {
done(ratelimit.DoneInfo{Err: err}) done(ratelimit.DoneInfo{Err: err})
return return
} }
return nil, errors.New(400, "Error with transport.FromServerContext", "Error with transport.FromServerContext") return reply, nil
} }
} }
} }

View File

@ -21,71 +21,89 @@ import (
"github.com/go-kratos/kratos/v2/transport/http" "github.com/go-kratos/kratos/v2/transport/http"
) )
type router struct {
service string
}
type RouterOption func(o *router)
// WithRouterService set the caller service name used by the route
func WithRouterService(service string) RouterOption {
return func(o *router) {
o.service = service
}
}
// NodeFilter polaris dynamic router selector // NodeFilter polaris dynamic router selector
func (p *Polaris) NodeFilter() selector.NodeFilter { func (p *Polaris) NodeFilter(opts ...RouterOption) selector.NodeFilter {
o := router{service: p.service}
for _, opt := range opts {
opt(&o)
}
return func(ctx context.Context, nodes []selector.Node) []selector.Node { return func(ctx context.Context, nodes []selector.Node) []selector.Node {
if len(nodes) == 0 { if len(nodes) == 0 {
return nodes return nodes
} }
if appInfo, ok := kratos.FromContext(ctx); ok { req := &polaris.ProcessRoutersRequest{
req := &polaris.ProcessRoutersRequest{ ProcessRoutersRequest: model.ProcessRoutersRequest{
ProcessRoutersRequest: model.ProcessRoutersRequest{ SourceService: model.ServiceInfo{Namespace: p.namespace, Service: o.service},
SourceService: model.ServiceInfo{ DstInstances: buildPolarisInstance(p.namespace, nodes),
Service: appInfo.Name(), },
Namespace: p.namespace,
},
DstInstances: buildPolarisInstance(p.namespace, nodes),
},
}
req.AddArguments(model.BuildCallerServiceArgument(p.namespace, appInfo.Name()))
// process transport
if tr, ok := transport.FromServerContext(ctx); ok {
req.AddArguments(model.BuildMethodArgument(tr.Operation()))
req.AddArguments(model.BuildPathArgument(tr.Operation()))
for _, key := range tr.RequestHeader().Keys() {
req.AddArguments(model.BuildHeaderArgument(key, tr.RequestHeader().Get(key)))
}
// http
if ht, ok := tr.(http.Transporter); ok {
req.AddArguments(model.BuildPathArgument(ht.Request().URL.Path))
req.AddArguments(model.BuildCallerIPArgument(ht.Request().RemoteAddr))
// cookie
for _, cookie := range ht.Request().Cookies() {
req.AddArguments(model.BuildCookieArgument(cookie.Name, cookie.Value))
}
// url query
for key, values := range ht.Request().URL.Query() {
req.AddArguments(model.BuildQueryArgument(key, strings.Join(values, ",")))
}
}
}
n := make(map[string]selector.Node, len(nodes))
for _, node := range nodes {
n[node.Address()] = node
}
m, err := p.router.ProcessRouters(req)
if err != nil {
log.Errorf("polaris process routers failed, err=%v", err)
return nodes
}
newNode := make([]selector.Node, 0, len(m.Instances))
for _, ins := range m.GetInstances() {
if v, ok := n[fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())]; ok {
newNode = append(newNode, v)
}
}
return newNode
} }
return nodes if appInfo, ok := kratos.FromContext(ctx); ok {
req.SourceService.Service = appInfo.Name()
}
req.AddArguments(model.BuildCallerServiceArgument(p.namespace, req.ProcessRoutersRequest.SourceService.Service))
// process transport
if tr, ok := transport.FromClientContext(ctx); ok {
req.AddArguments(model.BuildMethodArgument(tr.Operation()))
req.AddArguments(model.BuildPathArgument(tr.Operation()))
for _, key := range tr.RequestHeader().Keys() {
req.AddArguments(model.BuildHeaderArgument(strings.ToLower(key), tr.RequestHeader().Get(key)))
}
// http
if ht, ok := tr.(http.Transporter); ok {
req.AddArguments(model.BuildPathArgument(ht.Request().URL.Path))
req.AddArguments(model.BuildCallerIPArgument(ht.Request().RemoteAddr))
// cookie
for _, cookie := range ht.Request().Cookies() {
req.AddArguments(model.BuildCookieArgument(cookie.Name, cookie.Value))
}
// url query
for key, values := range ht.Request().URL.Query() {
req.AddArguments(model.BuildQueryArgument(key, strings.Join(values, ",")))
}
}
}
n := make(map[string]selector.Node, len(nodes))
for _, node := range nodes {
n[node.Address()] = node
}
m, err := p.router.ProcessRouters(req)
if err != nil {
log.Errorf("polaris process routers failed, err=%v", err)
return nodes
}
newNode := make([]selector.Node, 0, len(m.Instances))
for _, ins := range m.GetInstances() {
if v, ok := n[fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())]; ok {
newNode = append(newNode, v)
}
}
if len(newNode) == 0 {
return nodes
}
return newNode
} }
} }