1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-04 21:42:57 +02:00

support hystrix filter (#2265)

This commit is contained in:
Johnson C
2021-09-13 16:23:26 +08:00
committed by GitHub
parent 6c3a5c161f
commit 22409c8ff3
8 changed files with 308 additions and 41 deletions

View File

@@ -0,0 +1,78 @@
# Micro Hystrix Client Wrapper
A go-micro plugin for go-hystrix.
## Usage
```
package main
import (
"github.com/asim/go-micro/v3"
hystrix "github.com/asim/go-micro/plugins/wrapper/breaker/hystrix/v3"
)
func main() {
service := micro.NewService(micro.WrapClient(hystrix.NewClientWrapper()))
service.Init(micro.Name("test.srv"), micro.Address(":80"))
if err := service.Run(); err != nil {
panic(err)
}
}
```
## Filter
```
package main
import (
"github.com/asim/go-micro/v3"
hystrix "github.com/asim/go-micro/plugins/wrapper/breaker/hystrix/v3"
)
func main() {
service := micro.NewService(micro.WrapClient(hystrix.NewClientWrapper(hystrix.WithFilter(func(c context.Context, e error) error {
if e == ErrLetItPass {
return nil
}
return e
}))))
service.Init(micro.Name("test.srv"), micro.Address(":80"))
if err := service.Run(); err != nil {
panic(err)
}
}
```
## Default Configure in hystrix
```
var (
// DefaultTimeout is how long to wait for command to complete, in milliseconds
DefaultTimeout = 1000
// DefaultMaxConcurrent is how many commands of the same type can run at the same time
DefaultMaxConcurrent = 10
// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
DefaultVolumeThreshold = 20
// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
DefaultSleepWindow = 5000
// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
DefaultErrorPercentThreshold = 50
)
```
# Update default config in hystrix
```
package main
import (
"github.com/asim/go-micro/v3"
hystrix "github.com/asim/go-micro/plugins/wrapper/breaker/hystrix/v3"
)
func main() {
hystrix.ConfigureDefault(hystrix.CommandConfig{Timeout: 1000})
service := micro.NewService(micro.WrapClient(hystrix.NewClientWrapper()))
service.Init(micro.Name("test.srv"), micro.Address(":80"))
if err := service.Run(); err != nil {
panic(err)
}
}
```

View File

@@ -0,0 +1,51 @@
package hystrix
import (
"github.com/afex/hystrix-go/hystrix"
)
// CommandConfig is used to tune circuit settings at runtime
type CommandConfig struct {
Timeout int
MaxConcurrentRequests int
RequestVolumeThreshold int
SleepWindow int
ErrorPercentThreshold int
}
// Configure applies settings for a set of circuits
func Configure(cmds map[string]CommandConfig) {
for k, v := range cmds {
ConfigureCommand(k, v)
}
}
// ConfigureCommand applies settings for a circuit
func ConfigureCommand(name string, config CommandConfig) {
hystrix.ConfigureCommand(name, hystrix.CommandConfig{
Timeout: config.Timeout,
MaxConcurrentRequests: config.MaxConcurrentRequests,
RequestVolumeThreshold: config.RequestVolumeThreshold,
SleepWindow: config.SleepWindow,
ErrorPercentThreshold: config.ErrorPercentThreshold,
})
}
// ConfigureDefault applies default settings for all circuits
func ConfigureDefault(config CommandConfig) {
if config.Timeout != 0 {
hystrix.DefaultTimeout = config.Timeout
}
if config.MaxConcurrentRequests != 0 {
hystrix.DefaultMaxConcurrent = config.MaxConcurrentRequests
}
if config.RequestVolumeThreshold != 0 {
hystrix.DefaultVolumeThreshold = config.RequestVolumeThreshold
}
if config.SleepWindow != 0 {
hystrix.DefaultSleepWindow = config.SleepWindow
}
if config.ErrorPercentThreshold != 0 {
hystrix.DefaultErrorPercentThreshold = config.ErrorPercentThreshold
}
}

View File

@@ -0,0 +1,51 @@
package hystrix
import (
"testing"
"time"
"github.com/afex/hystrix-go/hystrix"
)
func TestConfigure(t *testing.T) {
command, timeout := "testing.configure", 200
Configure(map[string]CommandConfig{command: {Timeout: timeout}})
configures := hystrix.GetCircuitSettings()
if c, ok := configures[command]; !ok || c.Timeout != time.Duration(timeout)*time.Millisecond {
t.Fail()
}
}
func TestConfigureCommand(t *testing.T) {
command, timeout := "testing.configureCommand", 300
ConfigureCommand(command, CommandConfig{Timeout: timeout})
configures := hystrix.GetCircuitSettings()
if c, ok := configures[command]; !ok || c.Timeout != time.Duration(timeout)*time.Millisecond {
t.Fail()
}
}
func TestConfigureDefault(t *testing.T) {
timeout, maxConcurrent, reqThreshold, sleepWindow, errThreshold := 100, 20, 10, 500, 5
ConfigureDefault(CommandConfig{
Timeout: timeout,
MaxConcurrentRequests: maxConcurrent,
RequestVolumeThreshold: reqThreshold,
SleepWindow: sleepWindow,
ErrorPercentThreshold: errThreshold})
if hystrix.DefaultTimeout != timeout {
t.Fail()
}
if hystrix.DefaultVolumeThreshold != reqThreshold {
t.Fail()
}
if hystrix.DefaultMaxConcurrent != maxConcurrent {
t.Fail()
}
if hystrix.DefaultSleepWindow != sleepWindow {
t.Fail()
}
if hystrix.DefaultErrorPercentThreshold != errThreshold {
t.Fail()
}
}

View File

@@ -4,11 +4,5 @@ go 1.16
require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/asim/go-micro/plugins/registry/memory/v3 v3.0.0-20210630062103-c13bb07171bc
github.com/asim/go-micro/v3 v3.5.2-0.20210630062103-c13bb07171bc
)
replace (
github.com/asim/go-micro/plugins/registry/memory/v3 => ../../../../plugins/registry/memory
github.com/asim/go-micro/v3 => ../../../../../go-micro
github.com/asim/go-micro/v3 v3.6.0
)

View File

@@ -65,6 +65,8 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asim/go-micro/v3 v3.6.0 h1:I6UVJBpBtWNKCjWf0dRpZznRCW9TR4DXjV4wieyGhK0=
github.com/asim/go-micro/v3 v3.6.0/go.mod h1:cNGIIYQcp0qy+taNYmrBdaIHeqMWHV5ZH/FfQzfOyE8=
github.com/aws/aws-sdk-go v1.37.27/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=

View File

@@ -9,29 +9,34 @@ import (
type clientWrapper struct {
client.Client
fallback func(error) error
filter func(context.Context, error) error
fallback func(context.Context, error) error
}
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
return hystrix.Do(req.Service()+"."+req.Endpoint(), func() error {
return c.Client.Call(ctx, req, rsp, opts...)
}, c.fallback)
func (cw *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var err error
herr := hystrix.DoC(ctx, req.Service()+"."+req.Endpoint(), func(c context.Context) error {
err = cw.Client.Call(c, req, rsp, opts...)
if cw.filter != nil {
// custom error handling, filter errors that should not trigger circuit breaker
return cw.filter(ctx, err)
}
return err
}, cw.fallback)
if herr != nil {
return herr
}
// return original error
return err
}
// NewClientWrapper returns a hystrix client Wrapper.
func NewClientWrapper(fallbacks ...func(error) error) client.Wrapper {
func NewClientWrapper(opts ...Option) client.Wrapper {
var options Options
for _, o := range opts {
o(&options)
}
return func(c client.Client) client.Client {
return &clientWrapper{c, resolveFallback(fallbacks)}
}
}
func resolveFallback(fallbacks []func(error) error) func(error) error {
switch len(fallbacks) {
case 0:
return nil
case 1:
return fallbacks[0]
default:
panic("too many fallback parameters")
return &clientWrapper{c, options.Filter, options.Fallback}
}
}

View File

@@ -2,32 +2,26 @@ package hystrix
import (
"context"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"github.com/asim/go-micro/plugins/registry/memory/v3"
"github.com/asim/go-micro/v3/client"
"github.com/asim/go-micro/v3/selector"
"errors"
"strings"
"testing"
)
func fallbackEvent() func(error) error {
return func(err error) error {
// You can set up webhook event messages here
fmt.Println("publish event message")
return err
}
}
"github.com/afex/hystrix-go/hystrix"
"github.com/asim/go-micro/v3/client"
merrors "github.com/asim/go-micro/v3/errors"
"github.com/asim/go-micro/v3/registry"
"github.com/asim/go-micro/v3/selector"
)
func TestBreaker(t *testing.T) {
// setup
r := memory.NewRegistry()
r := registry.NewMemoryRegistry()
s := selector.NewSelector(selector.Registry(r))
c := client.NewClient(
// set the selector
client.Selector(s),
// add the breaker wrapper
client.Wrap(NewClientWrapper(fallbackEvent())),
client.Wrap(NewClientWrapper()),
)
req := c.NewRequest("test.service", "Test.Method", map[string]string{
@@ -50,3 +44,69 @@ func TestBreaker(t *testing.T) {
t.Errorf("Expecting tripped breaker, got %v", err)
}
}
func TestBreakerWithFilter(t *testing.T) {
r := registry.NewMemoryRegistry()
s := selector.NewSelector(selector.Registry(r))
c := client.NewClient(
client.Selector(s),
client.Wrap(NewClientWrapper(WithFilter(func(c context.Context, e error) error {
var merr *merrors.Error
if errors.As(e, &merr) && merr.Detail == "service test.service: not found" {
return nil
}
return e
}))),
)
req := c.NewRequest("test.service", "Test.FilterMethod", nil)
for i := 0; i < (hystrix.DefaultVolumeThreshold); i++ {
c.Call(context.TODO(), req, nil)
}
circuit, _, _ := hystrix.GetCircuit("test.service.Test.FilterMethod")
if circuit.IsOpen() {
t.Errorf("breaker should not be opened")
}
err := c.Call(context.TODO(), req, nil)
if err == nil {
t.Error("original error should be throw out")
}
}
func TestBreakerWithFallback(t *testing.T) {
r := registry.NewMemoryRegistry()
s := selector.NewSelector(selector.Registry(r))
c := client.NewClient(
client.Selector(s),
client.Wrap(NewClientWrapper(WithFallback(func(c context.Context, e error) error {
var merr *merrors.Error
if errors.As(e, &merr) && merr.Detail == "service test.service: not found" {
return hystrix.ErrCircuitOpen
}
return e
}))),
)
// trigger fallback to open circuit breaker
req := c.NewRequest("test.service", "Test.FallbackMethod", nil)
for i := 0; i < (hystrix.DefaultVolumeThreshold); i++ {
c.Call(context.TODO(), req, nil)
}
err := c.Call(context.TODO(), req, nil)
if err == nil || !strings.HasPrefix(err.Error(), "fallback failed with 'hystrix: circuit open'") {
t.Error("fallback-failure error should be throw out")
return
}
circuit, _, _ := hystrix.GetCircuit("test.service.Test.FallbackMethod")
if !circuit.IsOpen() {
t.Errorf("breaker should be opened")
}
err = c.Call(context.TODO(), req, nil)
if err == nil {
t.Error("original error should be throw out")
}
}

View File

@@ -0,0 +1,26 @@
package hystrix
import "context"
// Options represents hystrix client wrapper options
type Options struct {
Filter func(context.Context, error) error
Fallback func(context.Context, error) error
}
// Option represents options update func
type Option func(*Options)
// WithFilter used to set filter func for options
func WithFilter(filter func(context.Context, error) error) Option {
return func(o *Options) {
o.Filter = filter
}
}
// WithFallback used to set fallback func for options
func WithFallback(fallback func(context.Context, error) error) Option {
return func(o *Options) {
o.Fallback = fallback
}
}