1
0
mirror of https://github.com/labstack/echo.git synced 2025-01-07 23:01:56 +02:00

Fixes the concurrency issue of calling the Next() proxy target on RRB (#2409)

* Fixes the concurrency issue of calling the `Next()` proxy target on round robin balancer

- fixed concurrency issue in `AddTarget()`
- moved `rand.New()` to the random balancer initializer func.
- internal code reorganized eliminating unnecessary pointer redirection
- employing `sync.Mutex` instead of `RWMutex` which brings additional overhead of tracking readers and writers. No need for that since the guarded code has no long-running operations, hence no realistic congestion.
- added additional guards without which the code would otherwise panic (e.g., the case where a random value is calculation when targets list is empty)
- added descriptions for func return values, what to expect in which case.
- Improve code test coverage

---------

Co-authored-by: Becir Basic <bb@neotel.at>
This commit is contained in:
Becir Basic 2023-02-24 19:32:41 +01:00 committed by GitHub
parent 1e575b7b56
commit 5b36ce3612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 22 deletions

View File

@ -12,7 +12,6 @@ import (
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
@ -79,19 +78,20 @@ type (
commonBalancer struct { commonBalancer struct {
targets []*ProxyTarget targets []*ProxyTarget
mutex sync.RWMutex mutex sync.Mutex
} }
// RandomBalancer implements a random load balancing technique. // RandomBalancer implements a random load balancing technique.
randomBalancer struct { randomBalancer struct {
*commonBalancer commonBalancer
random *rand.Rand random *rand.Rand
} }
// RoundRobinBalancer implements a round-robin load balancing technique. // RoundRobinBalancer implements a round-robin load balancing technique.
roundRobinBalancer struct { roundRobinBalancer struct {
*commonBalancer commonBalancer
i uint32 // tracking the index on `targets` slice for the next `*ProxyTarget` to be used
i int
} }
) )
@ -143,32 +143,37 @@ func proxyRaw(t *ProxyTarget, c echo.Context) http.Handler {
// NewRandomBalancer returns a random proxy balancer. // NewRandomBalancer returns a random proxy balancer.
func NewRandomBalancer(targets []*ProxyTarget) ProxyBalancer { func NewRandomBalancer(targets []*ProxyTarget) ProxyBalancer {
b := &randomBalancer{commonBalancer: new(commonBalancer)} b := randomBalancer{}
b.targets = targets b.targets = targets
return b b.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
return &b
} }
// NewRoundRobinBalancer returns a round-robin proxy balancer. // NewRoundRobinBalancer returns a round-robin proxy balancer.
func NewRoundRobinBalancer(targets []*ProxyTarget) ProxyBalancer { func NewRoundRobinBalancer(targets []*ProxyTarget) ProxyBalancer {
b := &roundRobinBalancer{commonBalancer: new(commonBalancer)} b := roundRobinBalancer{}
b.targets = targets b.targets = targets
return b return &b
} }
// AddTarget adds an upstream target to the list. // AddTarget adds an upstream target to the list and returns `true`.
//
// However, if a target with the same name already exists then the operation is aborted returning `false`.
func (b *commonBalancer) AddTarget(target *ProxyTarget) bool { func (b *commonBalancer) AddTarget(target *ProxyTarget) bool {
b.mutex.Lock()
defer b.mutex.Unlock()
for _, t := range b.targets { for _, t := range b.targets {
if t.Name == target.Name { if t.Name == target.Name {
return false return false
} }
} }
b.mutex.Lock()
defer b.mutex.Unlock()
b.targets = append(b.targets, target) b.targets = append(b.targets, target)
return true return true
} }
// RemoveTarget removes an upstream target from the list. // RemoveTarget removes an upstream target from the list by name.
//
// Returns `true` on success, `false` if no target with the name is found.
func (b *commonBalancer) RemoveTarget(name string) bool { func (b *commonBalancer) RemoveTarget(name string) bool {
b.mutex.Lock() b.mutex.Lock()
defer b.mutex.Unlock() defer b.mutex.Unlock()
@ -182,20 +187,36 @@ func (b *commonBalancer) RemoveTarget(name string) bool {
} }
// Next randomly returns an upstream target. // Next randomly returns an upstream target.
//
// Note: `nil` is returned in case upstream target list is empty.
func (b *randomBalancer) Next(c echo.Context) *ProxyTarget { func (b *randomBalancer) Next(c echo.Context) *ProxyTarget {
if b.random == nil { b.mutex.Lock()
b.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) defer b.mutex.Unlock()
if len(b.targets) == 0 {
return nil
} else if len(b.targets) == 1 {
return b.targets[0]
} }
b.mutex.RLock()
defer b.mutex.RUnlock()
return b.targets[b.random.Intn(len(b.targets))] return b.targets[b.random.Intn(len(b.targets))]
} }
// Next returns an upstream target using round-robin technique. // Next returns an upstream target using round-robin technique.
//
// Note: `nil` is returned in case upstream target list is empty.
func (b *roundRobinBalancer) Next(c echo.Context) *ProxyTarget { func (b *roundRobinBalancer) Next(c echo.Context) *ProxyTarget {
b.i = b.i % uint32(len(b.targets)) b.mutex.Lock()
defer b.mutex.Unlock()
if len(b.targets) == 0 {
return nil
} else if len(b.targets) == 1 {
return b.targets[0]
}
// reset the index if out of bounds
if b.i >= len(b.targets) {
b.i = 0
}
t := b.targets[b.i] t := b.targets[b.i]
atomic.AddUint32(&b.i, 1) b.i++
return t return t
} }

View File

@ -122,7 +122,7 @@ func TestProxy(t *testing.T) {
} }
type testProvider struct { type testProvider struct {
*commonBalancer commonBalancer
target *ProxyTarget target *ProxyTarget
err error err error
} }
@ -143,7 +143,7 @@ func TestTargetProvider(t *testing.T) {
url1, _ := url.Parse(t1.URL) url1, _ := url.Parse(t1.URL)
e := echo.New() e := echo.New()
tp := &testProvider{commonBalancer: new(commonBalancer)} tp := &testProvider{}
tp.target = &ProxyTarget{Name: "target 1", URL: url1} tp.target = &ProxyTarget{Name: "target 1", URL: url1}
e.Use(Proxy(tp)) e.Use(Proxy(tp))
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@ -158,7 +158,7 @@ func TestFailNextTarget(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
e := echo.New() e := echo.New()
tp := &testProvider{commonBalancer: new(commonBalancer)} tp := &testProvider{}
tp.target = &ProxyTarget{Name: "target 1", URL: url1} tp.target = &ProxyTarget{Name: "target 1", URL: url1}
tp.err = echo.NewHTTPError(http.StatusInternalServerError, "method could not select target") tp.err = echo.NewHTTPError(http.StatusInternalServerError, "method could not select target")
@ -422,3 +422,12 @@ func TestClientCancelConnectionResultsHTTPCode499(t *testing.T) {
timeoutStop.Done() timeoutStop.Done()
assert.Equal(t, 499, rec.Code) assert.Equal(t, 499, rec.Code)
} }
// Assert balancer with empty targets does return `nil` on `Next()`
func TestProxyBalancerWithNoTargets(t *testing.T) {
rb := NewRandomBalancer(nil)
assert.Nil(t, rb.Next(nil))
rrb := NewRoundRobinBalancer([]*ProxyTarget{})
assert.Nil(t, rrb.Next(nil))
}