mirror of
https://github.com/go-micro/go-micro.git
synced 2026-05-06 19:21:46 +02:00
baeb282cf1
* Initial plan * Enhance cache rate limiting to protect against rolling deployment scenarios Per @asim's feedback, rate limiting now applies to ALL cache refresh attempts (not just error cases) to prevent registry overload during rolling deployments. Changes: - Rate limit ALL refresh attempts using lastRefreshAttempt tracking - Always return stale cache if available (even if expired) when rate limiting - Only return ErrNotFound when no cache exists during rate limit period - Rate limiting happens inside singleflight to avoid race conditions - Update test to reflect new behavior (no retry when stale cache + rate limit) - Enhanced documentation with rolling deployment scenario examples This addresses the scenario where all upstream services expire their cache simultaneously during a downstream rolling deployment, which would previously cause a stampede to the registry under high QPS. Co-authored-by: asim <17530+asim@users.noreply.github.com> * Remove unused failedAttempts and consecutiveFailures fields Per @asim's feedback, these fields are no longer needed since the new rate limiting strategy uses lastRefreshAttempt to track ALL refresh attempts, not just failed ones. Removed: - failedAttempts map[string]time.Time - consecutiveFailures int - All code that sets these fields The rate limiting logic now only uses lastRefreshAttempt, making these fields redundant. All tests continue to pass. Co-authored-by: asim <17530+asim@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: asim <17530+asim@users.noreply.github.com>
531 lines
13 KiB
Go
531 lines
13 KiB
Go
package cache
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"go-micro.dev/v5/logger"
|
|
"go-micro.dev/v5/registry"
|
|
)
|
|
|
|
// mockRegistry is a mock implementation of registry.Registry for testing
|
|
type mockRegistry struct {
|
|
callCount int32
|
|
delay time.Duration
|
|
err error
|
|
services []*registry.Service
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (m *mockRegistry) Init(...registry.Option) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockRegistry) Options() registry.Options {
|
|
return registry.Options{}
|
|
}
|
|
|
|
func (m *mockRegistry) Register(*registry.Service, ...registry.RegisterOption) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockRegistry) Deregister(*registry.Service, ...registry.DeregisterOption) error {
|
|
return nil
|
|
}
|
|
|
|
func (m *mockRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
|
|
// Increment call count
|
|
atomic.AddInt32(&m.callCount, 1)
|
|
|
|
// Simulate delay (e.g., network latency)
|
|
if m.delay > 0 {
|
|
time.Sleep(m.delay)
|
|
}
|
|
|
|
// Return error if configured
|
|
if m.err != nil {
|
|
return nil, m.err
|
|
}
|
|
|
|
// Return services
|
|
return m.services, nil
|
|
}
|
|
|
|
func (m *mockRegistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (m *mockRegistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (m *mockRegistry) String() string {
|
|
return "mock"
|
|
}
|
|
|
|
func (m *mockRegistry) getCallCount() int32 {
|
|
return atomic.LoadInt32(&m.callCount)
|
|
}
|
|
|
|
// TestSingleflightPreventsStampede verifies that concurrent requests for the same service
|
|
// only result in a single call to the underlying registry
|
|
func TestSingleflightPreventsStampede(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
delay: 100 * time.Millisecond, // Simulate slow etcd response
|
|
services: []*registry.Service{
|
|
{
|
|
Name: "test.service",
|
|
Version: "1.0.0",
|
|
Nodes: []*registry.Node{
|
|
{Id: "node1", Address: "localhost:9090"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Type assertion to *cache is necessary to access internal state for verification
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = time.Minute
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// Launch 10 concurrent requests for the same service
|
|
const concurrency = 10
|
|
var wg sync.WaitGroup
|
|
wg.Add(concurrency)
|
|
|
|
results := make([][]*registry.Service, concurrency)
|
|
errs := make([]error, concurrency)
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
services, err := c.GetService("test.service")
|
|
results[idx] = services
|
|
errs[idx] = err
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify that only 1 call was made to the underlying registry
|
|
callCount := mock.getCallCount()
|
|
if callCount != 1 {
|
|
t.Errorf("Expected 1 call to registry, got %d", callCount)
|
|
}
|
|
|
|
// Verify all requests got the same result
|
|
for i := 0; i < concurrency; i++ {
|
|
if errs[i] != nil {
|
|
t.Errorf("Request %d failed: %v", i, errs[i])
|
|
}
|
|
if len(results[i]) != 1 {
|
|
t.Errorf("Request %d got %d services, expected 1", i, len(results[i]))
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestSingleflightWithError verifies that when etcd fails, only one request is made
|
|
// and all concurrent callers receive the error
|
|
func TestSingleflightWithError(t *testing.T) {
|
|
expectedErr := errors.New("etcd connection failed")
|
|
mock := &mockRegistry{
|
|
delay: 50 * time.Millisecond,
|
|
err: expectedErr,
|
|
}
|
|
|
|
// Type assertion to *cache is necessary to access internal state for verification
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = time.Minute
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// Launch concurrent requests
|
|
const concurrency = 10
|
|
var wg sync.WaitGroup
|
|
wg.Add(concurrency)
|
|
|
|
errs := make([]error, concurrency)
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
_, err := c.GetService("test.service")
|
|
errs[idx] = err
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify that only 1 call was made to the underlying registry
|
|
callCount := mock.getCallCount()
|
|
if callCount != 1 {
|
|
t.Errorf("Expected 1 call to registry even on error, got %d", callCount)
|
|
}
|
|
|
|
// Verify all requests got the error
|
|
for i := 0; i < concurrency; i++ {
|
|
if errs[i] == nil {
|
|
t.Errorf("Request %d should have failed", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestStaleCacheOnError verifies that stale cache is returned when registry fails
|
|
func TestStaleCacheOnError(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
services: []*registry.Service{
|
|
{
|
|
Name: "test.service",
|
|
Version: "1.0.0",
|
|
Nodes: []*registry.Node{
|
|
{Id: "node1", Address: "localhost:9090"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Type assertion to *cache is necessary to access internal state for verification
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = 100 * time.Millisecond // Short TTL for testing
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// First request - should populate cache
|
|
services, err := c.GetService("test.service")
|
|
if err != nil {
|
|
t.Fatalf("First request failed: %v", err)
|
|
}
|
|
if len(services) != 1 {
|
|
t.Fatalf("Expected 1 service, got %d", len(services))
|
|
}
|
|
|
|
// Wait for cache to expire
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
// Configure mock to fail
|
|
mock.err = errors.New("etcd unavailable")
|
|
|
|
// Second request - should return stale cache despite error
|
|
services, err = c.GetService("test.service")
|
|
if err != nil {
|
|
t.Errorf("Should have returned stale cache, got error: %v", err)
|
|
}
|
|
if len(services) != 1 {
|
|
t.Errorf("Expected stale cache with 1 service, got %d", len(services))
|
|
}
|
|
}
|
|
|
|
// TestCachePenetrationPrevention verifies the complete flow:
|
|
// 1. Cache populated
|
|
// 2. Cache expires
|
|
// 3. Registry fails
|
|
// 4. Concurrent requests don't stampede registry due to rate limiting
|
|
// 5. Stale cache is returned without hitting registry
|
|
func TestCachePenetrationPrevention(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
services: []*registry.Service{
|
|
{
|
|
Name: "test.service",
|
|
Version: "1.0.0",
|
|
Nodes: []*registry.Node{
|
|
{Id: "node1", Address: "localhost:9090"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Type assertion to *cache is necessary to access internal state for verification
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = 100 * time.Millisecond
|
|
o.Logger = logger.DefaultLogger
|
|
// Use short retry interval to test rate limiting
|
|
o.MinimumRetryInterval = 5 * time.Second
|
|
}).(*cache)
|
|
|
|
// Initial request to populate cache
|
|
_, err := c.GetService("test.service")
|
|
if err != nil {
|
|
t.Fatalf("Initial request failed: %v", err)
|
|
}
|
|
|
|
initialCalls := mock.getCallCount()
|
|
if initialCalls != 1 {
|
|
t.Fatalf("Expected 1 initial call, got %d", initialCalls)
|
|
}
|
|
|
|
// Wait for cache to expire (but not past retry interval)
|
|
time.Sleep(150 * time.Millisecond)
|
|
|
|
// Configure mock to fail with delay
|
|
mock.err = errors.New("etcd overloaded")
|
|
mock.delay = 100 * time.Millisecond
|
|
|
|
// Launch many concurrent requests (simulating stampede)
|
|
const concurrency = 50
|
|
var wg sync.WaitGroup
|
|
wg.Add(concurrency)
|
|
|
|
successCount := int32(0)
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
services, err := c.GetService("test.service")
|
|
// Should return stale cache without error (rate limiting prevents registry call)
|
|
if err == nil && len(services) > 0 {
|
|
atomic.AddInt32(&successCount, 1)
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify:
|
|
// 1. NO additional calls (rate limiting + stale cache prevented registry access)
|
|
totalCalls := mock.getCallCount()
|
|
if totalCalls != 1 { // only initial call, no retry due to rate limiting
|
|
t.Errorf("Expected 1 total call (rate limiting prevented retry), got %d", totalCalls)
|
|
}
|
|
|
|
// 2. All requests got stale cache (no errors)
|
|
if successCount != concurrency {
|
|
t.Errorf("Expected all %d requests to succeed with stale cache, got %d", concurrency, successCount)
|
|
}
|
|
}
|
|
|
|
// TestThrottlingWithoutStaleCache verifies that the cache throttles requests
|
|
// when there's no stale cache and the registry is failing
|
|
func TestThrottlingWithoutStaleCache(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
err: errors.New("etcd connection failed"),
|
|
}
|
|
|
|
// Create cache with short retry interval for testing
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = time.Minute
|
|
o.MinimumRetryInterval = 2 * time.Second
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// First request - should fail and record the attempt
|
|
_, err := c.GetService("test.service")
|
|
if err == nil {
|
|
t.Fatal("Expected error on first request, got nil")
|
|
}
|
|
|
|
callCount1 := mock.getCallCount()
|
|
if callCount1 != 1 {
|
|
t.Fatalf("Expected 1 call on first attempt, got %d", callCount1)
|
|
}
|
|
|
|
// Immediate second request - should be throttled (no registry call)
|
|
_, err = c.GetService("test.service")
|
|
if err == nil {
|
|
t.Fatal("Expected error on throttled request, got nil")
|
|
}
|
|
|
|
callCount2 := mock.getCallCount()
|
|
if callCount2 != 1 {
|
|
t.Errorf("Expected throttling (still 1 call), got %d calls", callCount2)
|
|
}
|
|
|
|
// Wait for retry interval to pass
|
|
time.Sleep(2100 * time.Millisecond)
|
|
|
|
// Third request - should be allowed (makes another registry call)
|
|
_, err = c.GetService("test.service")
|
|
if err == nil {
|
|
t.Fatal("Expected error after retry interval, got nil")
|
|
}
|
|
|
|
callCount3 := mock.getCallCount()
|
|
if callCount3 != 2 {
|
|
t.Errorf("Expected 2 calls after retry interval, got %d", callCount3)
|
|
}
|
|
}
|
|
|
|
// TestThrottlingMultipleConcurrentRequests verifies that throttling works
|
|
// correctly with multiple concurrent requests when there's no stale cache
|
|
func TestThrottlingMultipleConcurrentRequests(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
err: errors.New("etcd overloaded"),
|
|
delay: 50 * time.Millisecond,
|
|
}
|
|
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = time.Minute
|
|
o.MinimumRetryInterval = 1 * time.Second
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// First batch - all concurrent requests should result in single call (singleflight)
|
|
const concurrency = 20
|
|
var wg sync.WaitGroup
|
|
wg.Add(concurrency)
|
|
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
c.GetService("test.service")
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
callCount1 := mock.getCallCount()
|
|
if callCount1 != 1 {
|
|
t.Errorf("Expected 1 call (singleflight), got %d", callCount1)
|
|
}
|
|
|
|
// Second batch immediately after - should all be throttled (no new calls)
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
c.GetService("test.service")
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
callCount2 := mock.getCallCount()
|
|
if callCount2 != 1 {
|
|
t.Errorf("Expected throttling (still 1 call), got %d", callCount2)
|
|
}
|
|
|
|
// Wait for retry interval
|
|
time.Sleep(1100 * time.Millisecond)
|
|
|
|
// Third batch - should result in one more call
|
|
wg.Add(concurrency)
|
|
for i := 0; i < concurrency; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
c.GetService("test.service")
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
callCount3 := mock.getCallCount()
|
|
if callCount3 != 2 {
|
|
t.Errorf("Expected 2 calls after retry interval, got %d", callCount3)
|
|
}
|
|
}
|
|
|
|
// TestThrottlingDoesNotAffectSuccessfulLookups verifies that throttling
|
|
// doesn't interfere with successful service lookups
|
|
func TestThrottlingDoesNotAffectSuccessfulLookups(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
services: []*registry.Service{
|
|
{
|
|
Name: "test.service",
|
|
Version: "1.0.0",
|
|
Nodes: []*registry.Node{
|
|
{Id: "node1", Address: "localhost:9090"},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = 100 * time.Millisecond
|
|
o.MinimumRetryInterval = 2 * time.Second
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// Multiple successful requests in quick succession
|
|
for i := 0; i < 5; i++ {
|
|
services, err := c.GetService("test.service")
|
|
if err != nil {
|
|
t.Fatalf("Request %d failed: %v", i, err)
|
|
}
|
|
if len(services) != 1 {
|
|
t.Fatalf("Request %d got %d services, expected 1", i, len(services))
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
// First request should hit registry, others should use cache
|
|
callCount := mock.getCallCount()
|
|
if callCount != 1 {
|
|
t.Errorf("Expected 1 call (cached), got %d", callCount)
|
|
}
|
|
}
|
|
|
|
// TestThrottlingClearedOnSuccess verifies that failed attempt tracking
|
|
// is cleared when a subsequent request succeeds
|
|
func TestThrottlingClearedOnSuccess(t *testing.T) {
|
|
mock := &mockRegistry{
|
|
err: errors.New("temporary failure"),
|
|
}
|
|
|
|
c := New(mock, func(o *Options) {
|
|
o.TTL = time.Minute
|
|
o.MinimumRetryInterval = 1 * time.Second
|
|
o.Logger = logger.DefaultLogger
|
|
}).(*cache)
|
|
|
|
// First request fails
|
|
_, err := c.GetService("test.service")
|
|
if err == nil {
|
|
t.Fatal("Expected error on first request")
|
|
}
|
|
|
|
// Second request immediately after should be throttled
|
|
_, err = c.GetService("test.service")
|
|
if err == nil {
|
|
t.Fatal("Expected error on throttled request")
|
|
}
|
|
|
|
callCount1 := mock.getCallCount()
|
|
if callCount1 != 1 {
|
|
t.Errorf("Expected 1 call due to throttling, got %d", callCount1)
|
|
}
|
|
|
|
// Wait for retry interval
|
|
time.Sleep(1100 * time.Millisecond)
|
|
|
|
// Fix the mock to return success
|
|
mock.mu.Lock()
|
|
mock.err = nil
|
|
mock.services = []*registry.Service{
|
|
{
|
|
Name: "test.service",
|
|
Version: "1.0.0",
|
|
Nodes: []*registry.Node{
|
|
{Id: "node1", Address: "localhost:9090"},
|
|
},
|
|
},
|
|
}
|
|
mock.mu.Unlock()
|
|
|
|
// Request should succeed now
|
|
services, err := c.GetService("test.service")
|
|
if err != nil {
|
|
t.Fatalf("Expected success after fix, got error: %v", err)
|
|
}
|
|
if len(services) != 1 {
|
|
t.Fatalf("Expected 1 service, got %d", len(services))
|
|
}
|
|
|
|
// Immediate next request should NOT be throttled (throttling cleared)
|
|
services, err = c.GetService("test.service")
|
|
if err != nil {
|
|
t.Fatalf("Expected cached success, got error: %v", err)
|
|
}
|
|
if len(services) != 1 {
|
|
t.Fatalf("Expected 1 service from cache, got %d", len(services))
|
|
}
|
|
|
|
// Should be 2 calls total (1 failed + 1 success), no additional calls for cached request
|
|
callCount2 := mock.getCallCount()
|
|
if callCount2 != 2 {
|
|
t.Errorf("Expected 2 calls total, got %d", callCount2)
|
|
}
|
|
}
|