1
0
mirror of https://github.com/go-micro/go-micro.git synced 2026-05-06 19:21:46 +02:00
Files
go-micro/registry/cache/cache_test.go
Copilot baeb282cf1 Enhance cache rate limiting to prevent registry overload during rolling deployments (#2824)
* Initial plan

* Enhance cache rate limiting to protect against rolling deployment scenarios

Per @asim's feedback, rate limiting now applies to ALL cache refresh attempts
(not just error cases) to prevent registry overload during rolling deployments.

Changes:
- Rate limit ALL refresh attempts using lastRefreshAttempt tracking
- Always return stale cache if available (even if expired) when rate limiting
- Only return ErrNotFound when no cache exists during rate limit period
- Rate limiting happens inside singleflight to avoid race conditions
- Update test to reflect new behavior (no retry when stale cache + rate limit)
- Enhanced documentation with rolling deployment scenario examples

This addresses the scenario where all upstream services expire their cache
simultaneously during a downstream rolling deployment, which would previously
cause a stampede to the registry under high QPS.

Co-authored-by: asim <17530+asim@users.noreply.github.com>

* Remove unused failedAttempts and consecutiveFailures fields

Per @asim's feedback, these fields are no longer needed since the new
rate limiting strategy uses lastRefreshAttempt to track ALL refresh
attempts, not just failed ones.

Removed:
- failedAttempts map[string]time.Time
- consecutiveFailures int
- All code that sets these fields

The rate limiting logic now only uses lastRefreshAttempt, making these
fields redundant. All tests continue to pass.

Co-authored-by: asim <17530+asim@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: asim <17530+asim@users.noreply.github.com>
2026-01-14 10:57:14 +00:00

531 lines
13 KiB
Go

package cache
import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/registry"
)
// mockRegistry is a mock implementation of registry.Registry for testing
type mockRegistry struct {
callCount int32
delay time.Duration
err error
services []*registry.Service
mu sync.Mutex
}
func (m *mockRegistry) Init(...registry.Option) error {
return nil
}
func (m *mockRegistry) Options() registry.Options {
return registry.Options{}
}
func (m *mockRegistry) Register(*registry.Service, ...registry.RegisterOption) error {
return nil
}
func (m *mockRegistry) Deregister(*registry.Service, ...registry.DeregisterOption) error {
return nil
}
func (m *mockRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
// Increment call count
atomic.AddInt32(&m.callCount, 1)
// Simulate delay (e.g., network latency)
if m.delay > 0 {
time.Sleep(m.delay)
}
// Return error if configured
if m.err != nil {
return nil, m.err
}
// Return services
return m.services, nil
}
func (m *mockRegistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
return nil, nil
}
func (m *mockRegistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("not implemented")
}
func (m *mockRegistry) String() string {
return "mock"
}
func (m *mockRegistry) getCallCount() int32 {
return atomic.LoadInt32(&m.callCount)
}
// TestSingleflightPreventsStampede verifies that concurrent requests for the same service
// only result in a single call to the underlying registry
func TestSingleflightPreventsStampede(t *testing.T) {
mock := &mockRegistry{
delay: 100 * time.Millisecond, // Simulate slow etcd response
services: []*registry.Service{
{
Name: "test.service",
Version: "1.0.0",
Nodes: []*registry.Node{
{Id: "node1", Address: "localhost:9090"},
},
},
},
}
// Type assertion to *cache is necessary to access internal state for verification
c := New(mock, func(o *Options) {
o.TTL = time.Minute
o.Logger = logger.DefaultLogger
}).(*cache)
// Launch 10 concurrent requests for the same service
const concurrency = 10
var wg sync.WaitGroup
wg.Add(concurrency)
results := make([][]*registry.Service, concurrency)
errs := make([]error, concurrency)
for i := 0; i < concurrency; i++ {
go func(idx int) {
defer wg.Done()
services, err := c.GetService("test.service")
results[idx] = services
errs[idx] = err
}(i)
}
wg.Wait()
// Verify that only 1 call was made to the underlying registry
callCount := mock.getCallCount()
if callCount != 1 {
t.Errorf("Expected 1 call to registry, got %d", callCount)
}
// Verify all requests got the same result
for i := 0; i < concurrency; i++ {
if errs[i] != nil {
t.Errorf("Request %d failed: %v", i, errs[i])
}
if len(results[i]) != 1 {
t.Errorf("Request %d got %d services, expected 1", i, len(results[i]))
}
}
}
// TestSingleflightWithError verifies that when etcd fails, only one request is made
// and all concurrent callers receive the error
func TestSingleflightWithError(t *testing.T) {
expectedErr := errors.New("etcd connection failed")
mock := &mockRegistry{
delay: 50 * time.Millisecond,
err: expectedErr,
}
// Type assertion to *cache is necessary to access internal state for verification
c := New(mock, func(o *Options) {
o.TTL = time.Minute
o.Logger = logger.DefaultLogger
}).(*cache)
// Launch concurrent requests
const concurrency = 10
var wg sync.WaitGroup
wg.Add(concurrency)
errs := make([]error, concurrency)
for i := 0; i < concurrency; i++ {
go func(idx int) {
defer wg.Done()
_, err := c.GetService("test.service")
errs[idx] = err
}(i)
}
wg.Wait()
// Verify that only 1 call was made to the underlying registry
callCount := mock.getCallCount()
if callCount != 1 {
t.Errorf("Expected 1 call to registry even on error, got %d", callCount)
}
// Verify all requests got the error
for i := 0; i < concurrency; i++ {
if errs[i] == nil {
t.Errorf("Request %d should have failed", i)
}
}
}
// TestStaleCacheOnError verifies that stale cache is returned when registry fails
func TestStaleCacheOnError(t *testing.T) {
mock := &mockRegistry{
services: []*registry.Service{
{
Name: "test.service",
Version: "1.0.0",
Nodes: []*registry.Node{
{Id: "node1", Address: "localhost:9090"},
},
},
},
}
// Type assertion to *cache is necessary to access internal state for verification
c := New(mock, func(o *Options) {
o.TTL = 100 * time.Millisecond // Short TTL for testing
o.Logger = logger.DefaultLogger
}).(*cache)
// First request - should populate cache
services, err := c.GetService("test.service")
if err != nil {
t.Fatalf("First request failed: %v", err)
}
if len(services) != 1 {
t.Fatalf("Expected 1 service, got %d", len(services))
}
// Wait for cache to expire
time.Sleep(150 * time.Millisecond)
// Configure mock to fail
mock.err = errors.New("etcd unavailable")
// Second request - should return stale cache despite error
services, err = c.GetService("test.service")
if err != nil {
t.Errorf("Should have returned stale cache, got error: %v", err)
}
if len(services) != 1 {
t.Errorf("Expected stale cache with 1 service, got %d", len(services))
}
}
// TestCachePenetrationPrevention verifies the complete flow:
// 1. Cache populated
// 2. Cache expires
// 3. Registry fails
// 4. Concurrent requests don't stampede registry due to rate limiting
// 5. Stale cache is returned without hitting registry
func TestCachePenetrationPrevention(t *testing.T) {
mock := &mockRegistry{
services: []*registry.Service{
{
Name: "test.service",
Version: "1.0.0",
Nodes: []*registry.Node{
{Id: "node1", Address: "localhost:9090"},
},
},
},
}
// Type assertion to *cache is necessary to access internal state for verification
c := New(mock, func(o *Options) {
o.TTL = 100 * time.Millisecond
o.Logger = logger.DefaultLogger
// Use short retry interval to test rate limiting
o.MinimumRetryInterval = 5 * time.Second
}).(*cache)
// Initial request to populate cache
_, err := c.GetService("test.service")
if err != nil {
t.Fatalf("Initial request failed: %v", err)
}
initialCalls := mock.getCallCount()
if initialCalls != 1 {
t.Fatalf("Expected 1 initial call, got %d", initialCalls)
}
// Wait for cache to expire (but not past retry interval)
time.Sleep(150 * time.Millisecond)
// Configure mock to fail with delay
mock.err = errors.New("etcd overloaded")
mock.delay = 100 * time.Millisecond
// Launch many concurrent requests (simulating stampede)
const concurrency = 50
var wg sync.WaitGroup
wg.Add(concurrency)
successCount := int32(0)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
services, err := c.GetService("test.service")
// Should return stale cache without error (rate limiting prevents registry call)
if err == nil && len(services) > 0 {
atomic.AddInt32(&successCount, 1)
}
}()
}
wg.Wait()
// Verify:
// 1. NO additional calls (rate limiting + stale cache prevented registry access)
totalCalls := mock.getCallCount()
if totalCalls != 1 { // only initial call, no retry due to rate limiting
t.Errorf("Expected 1 total call (rate limiting prevented retry), got %d", totalCalls)
}
// 2. All requests got stale cache (no errors)
if successCount != concurrency {
t.Errorf("Expected all %d requests to succeed with stale cache, got %d", concurrency, successCount)
}
}
// TestThrottlingWithoutStaleCache verifies that the cache throttles requests
// when there's no stale cache and the registry is failing
func TestThrottlingWithoutStaleCache(t *testing.T) {
mock := &mockRegistry{
err: errors.New("etcd connection failed"),
}
// Create cache with short retry interval for testing
c := New(mock, func(o *Options) {
o.TTL = time.Minute
o.MinimumRetryInterval = 2 * time.Second
o.Logger = logger.DefaultLogger
}).(*cache)
// First request - should fail and record the attempt
_, err := c.GetService("test.service")
if err == nil {
t.Fatal("Expected error on first request, got nil")
}
callCount1 := mock.getCallCount()
if callCount1 != 1 {
t.Fatalf("Expected 1 call on first attempt, got %d", callCount1)
}
// Immediate second request - should be throttled (no registry call)
_, err = c.GetService("test.service")
if err == nil {
t.Fatal("Expected error on throttled request, got nil")
}
callCount2 := mock.getCallCount()
if callCount2 != 1 {
t.Errorf("Expected throttling (still 1 call), got %d calls", callCount2)
}
// Wait for retry interval to pass
time.Sleep(2100 * time.Millisecond)
// Third request - should be allowed (makes another registry call)
_, err = c.GetService("test.service")
if err == nil {
t.Fatal("Expected error after retry interval, got nil")
}
callCount3 := mock.getCallCount()
if callCount3 != 2 {
t.Errorf("Expected 2 calls after retry interval, got %d", callCount3)
}
}
// TestThrottlingMultipleConcurrentRequests verifies that throttling works
// correctly with multiple concurrent requests when there's no stale cache
func TestThrottlingMultipleConcurrentRequests(t *testing.T) {
mock := &mockRegistry{
err: errors.New("etcd overloaded"),
delay: 50 * time.Millisecond,
}
c := New(mock, func(o *Options) {
o.TTL = time.Minute
o.MinimumRetryInterval = 1 * time.Second
o.Logger = logger.DefaultLogger
}).(*cache)
// First batch - all concurrent requests should result in single call (singleflight)
const concurrency = 20
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
c.GetService("test.service")
}()
}
wg.Wait()
callCount1 := mock.getCallCount()
if callCount1 != 1 {
t.Errorf("Expected 1 call (singleflight), got %d", callCount1)
}
// Second batch immediately after - should all be throttled (no new calls)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
c.GetService("test.service")
}()
}
wg.Wait()
callCount2 := mock.getCallCount()
if callCount2 != 1 {
t.Errorf("Expected throttling (still 1 call), got %d", callCount2)
}
// Wait for retry interval
time.Sleep(1100 * time.Millisecond)
// Third batch - should result in one more call
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
c.GetService("test.service")
}()
}
wg.Wait()
callCount3 := mock.getCallCount()
if callCount3 != 2 {
t.Errorf("Expected 2 calls after retry interval, got %d", callCount3)
}
}
// TestThrottlingDoesNotAffectSuccessfulLookups verifies that throttling
// doesn't interfere with successful service lookups
func TestThrottlingDoesNotAffectSuccessfulLookups(t *testing.T) {
mock := &mockRegistry{
services: []*registry.Service{
{
Name: "test.service",
Version: "1.0.0",
Nodes: []*registry.Node{
{Id: "node1", Address: "localhost:9090"},
},
},
},
}
c := New(mock, func(o *Options) {
o.TTL = 100 * time.Millisecond
o.MinimumRetryInterval = 2 * time.Second
o.Logger = logger.DefaultLogger
}).(*cache)
// Multiple successful requests in quick succession
for i := 0; i < 5; i++ {
services, err := c.GetService("test.service")
if err != nil {
t.Fatalf("Request %d failed: %v", i, err)
}
if len(services) != 1 {
t.Fatalf("Request %d got %d services, expected 1", i, len(services))
}
time.Sleep(10 * time.Millisecond)
}
// First request should hit registry, others should use cache
callCount := mock.getCallCount()
if callCount != 1 {
t.Errorf("Expected 1 call (cached), got %d", callCount)
}
}
// TestThrottlingClearedOnSuccess verifies that failed attempt tracking
// is cleared when a subsequent request succeeds
func TestThrottlingClearedOnSuccess(t *testing.T) {
mock := &mockRegistry{
err: errors.New("temporary failure"),
}
c := New(mock, func(o *Options) {
o.TTL = time.Minute
o.MinimumRetryInterval = 1 * time.Second
o.Logger = logger.DefaultLogger
}).(*cache)
// First request fails
_, err := c.GetService("test.service")
if err == nil {
t.Fatal("Expected error on first request")
}
// Second request immediately after should be throttled
_, err = c.GetService("test.service")
if err == nil {
t.Fatal("Expected error on throttled request")
}
callCount1 := mock.getCallCount()
if callCount1 != 1 {
t.Errorf("Expected 1 call due to throttling, got %d", callCount1)
}
// Wait for retry interval
time.Sleep(1100 * time.Millisecond)
// Fix the mock to return success
mock.mu.Lock()
mock.err = nil
mock.services = []*registry.Service{
{
Name: "test.service",
Version: "1.0.0",
Nodes: []*registry.Node{
{Id: "node1", Address: "localhost:9090"},
},
},
}
mock.mu.Unlock()
// Request should succeed now
services, err := c.GetService("test.service")
if err != nil {
t.Fatalf("Expected success after fix, got error: %v", err)
}
if len(services) != 1 {
t.Fatalf("Expected 1 service, got %d", len(services))
}
// Immediate next request should NOT be throttled (throttling cleared)
services, err = c.GetService("test.service")
if err != nil {
t.Fatalf("Expected cached success, got error: %v", err)
}
if len(services) != 1 {
t.Fatalf("Expected 1 service from cache, got %d", len(services))
}
// Should be 2 calls total (1 failed + 1 success), no additional calls for cached request
callCount2 := mock.getCallCount()
if callCount2 != 2 {
t.Errorf("Expected 2 calls total, got %d", callCount2)
}
}