1
0
mirror of https://github.com/IBM/fp-go.git synced 2026-01-17 00:53:55 +02:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Carsten Leue
8acea9043f fix: refactor circuitbreaker (#152)
Signed-off-by: Dr. Carsten Leue <carsten.leue@de.ibm.com>
2026-01-15 11:36:32 +01:00
15 changed files with 1403 additions and 206 deletions

View File

@@ -4,7 +4,6 @@ import (
"time"
"github.com/IBM/fp-go/v2/either"
"github.com/IBM/fp-go/v2/function"
F "github.com/IBM/fp-go/v2/function"
"github.com/IBM/fp-go/v2/identity"
"github.com/IBM/fp-go/v2/io"
@@ -14,6 +13,7 @@ import (
"github.com/IBM/fp-go/v2/option"
"github.com/IBM/fp-go/v2/pair"
"github.com/IBM/fp-go/v2/reader"
"github.com/IBM/fp-go/v2/readerio"
"github.com/IBM/fp-go/v2/retry"
)
@@ -241,125 +241,155 @@ func isResetTimeExceeded(ct time.Time) option.Kleisli[openState, openState] {
})
}
// handleSuccessOnClosed handles a successful request when the circuit breaker is in closed state.
// It updates the closed state by recording the success and returns an IO operation that
// modifies the breaker state.
// handleSuccessOnClosed creates a Reader that handles successful requests when the circuit is closed.
// This function is used to update the circuit breaker state after a successful operation completes
// while the circuit is in the closed state.
//
// This function is part of the circuit breaker's state management for the closed state.
// When a request succeeds in closed state:
// 1. The current time is obtained
// 2. The addSuccess function is called with the current time to update the ClosedState
// 3. The updated ClosedState is wrapped in a Right (closed) BreakerState
// 4. The breaker state is modified with the new state
// The function takes a Reader that adds a success record to the ClosedState and lifts it to work
// with BreakerState by mapping over the Right (closed) side of the Either type. This ensures that
// success tracking only affects the closed state and leaves any open state unchanged.
//
// Parameters:
// - currentTime: An IO operation that provides the current time
// - addSuccess: A Reader that takes a time and returns an endomorphism for ClosedState,
// typically resetting failure counters or history
// - addSuccess: A Reader that takes the current time and returns an Endomorphism that updates
// the ClosedState by recording a successful operation. This typically increments a success
// counter or updates a success history.
//
// Returns:
// - An io.Kleisli that takes another io.Kleisli and chains them together.
// The outer Kleisli takes an Endomorphism[BreakerState] and returns BreakerState.
// This allows composing the success handling with other state modifications.
// - A Reader[time.Time, Endomorphism[BreakerState]] that, when given the current time, produces
// an endomorphism that updates the BreakerState by applying the success update to the closed
// state (if closed) or leaving the state unchanged (if open).
//
// Thread Safety: This function creates IO operations that will atomically modify the
// IORef[BreakerState] when executed. The state modifications are thread-safe.
//
// Type signature:
//
// io.Kleisli[io.Kleisli[Endomorphism[BreakerState], BreakerState], BreakerState]
// Thread Safety: This is a pure function that creates new state instances. The returned
// endomorphism is safe for concurrent use as it does not mutate its input.
//
// Usage Context:
// - Called when a request succeeds while the circuit is closed
// - Resets failure tracking (counter or history) in the ClosedState
// - Keeps the circuit in closed state
// - Called after a successful request completes while the circuit is closed
// - Updates success metrics/counters in the ClosedState
// - Does not affect the circuit state if it's already open
// - Part of the normal operation flow when the circuit breaker is functioning properly
func handleSuccessOnClosed(
currentTime IO[time.Time],
addSuccess Reader[time.Time, Endomorphism[ClosedState]],
) io.Kleisli[io.Kleisli[Endomorphism[BreakerState], BreakerState], BreakerState] {
) Reader[time.Time, Endomorphism[BreakerState]] {
return F.Flow2(
io.Chain,
identity.Flap[IO[BreakerState]](F.Pipe1(
currentTime,
io.Map(F.Flow2(
addSuccess,
either.Map[openState],
)))),
addSuccess,
either.Map[openState],
)
}
// handleFailureOnClosed handles a failed request when the circuit breaker is in closed state.
// It updates the closed state by recording the failure and checks if the circuit should open.
// handleFailureOnClosed creates a Reader that handles failed requests when the circuit is closed.
// This function manages the critical logic for determining whether a failure should cause the
// circuit breaker to open (transition from closed to open state).
//
// This function is part of the circuit breaker's state management for the closed state.
// When a request fails in closed state:
// 1. The current time is obtained
// 2. The addError function is called to record the failure in the ClosedState
// 3. The checkClosedState function is called to determine if the failure threshold is exceeded
// 4. If the threshold is exceeded (Check returns None):
// - The circuit transitions to open state using openCircuit
// - A new openState is created with resetAt time calculated from the retry policy
// 5. If the threshold is not exceeded (Check returns Some):
// - The circuit remains closed with the updated failure tracking
// The function orchestrates three key operations:
// 1. Records the failure in the ClosedState using addError
// 2. Checks if the failure threshold has been exceeded using checkClosedState
// 3. If threshold exceeded, opens the circuit; otherwise, keeps it closed with updated error count
//
// The decision flow is:
// - Add the error to the closed state's error tracking
// - Check if the updated closed state exceeds the failure threshold
// - If threshold exceeded (checkClosedState returns None):
// - Create a new openState with calculated reset time based on retry policy
// - Transition the circuit to open state (Left side of Either)
// - If threshold not exceeded (checkClosedState returns Some):
// - Keep the circuit closed with the updated error count
// - Continue allowing requests through
//
// Parameters:
// - currentTime: An IO operation that provides the current time
// - addError: A Reader that takes a time and returns an endomorphism for ClosedState,
// recording a failure (incrementing counter or adding to history)
// - checkClosedState: A Reader that takes a time and returns an option.Kleisli that checks
// if the ClosedState should remain closed. Returns Some if circuit stays closed, None if it should open.
// - openCircuit: A Reader that takes a time and returns an openState with calculated resetAt time
// - addError: A Reader that takes the current time and returns an Endomorphism that updates
// the ClosedState by recording a failed operation. This typically increments an error
// counter or adds to an error history.
// - checkClosedState: A Reader that takes the current time and returns an option.Kleisli that
// validates whether the ClosedState is still within acceptable failure thresholds.
// Returns Some(ClosedState) if threshold not exceeded, None if threshold exceeded.
// - openCircuit: A Reader that takes the current time and creates a new openState with
// appropriate reset time calculated from the retry policy. Used when transitioning to open.
//
// Returns:
// - An io.Kleisli that takes another io.Kleisli and chains them together.
// The outer Kleisli takes an Endomorphism[BreakerState] and returns BreakerState.
// This allows composing the failure handling with other state modifications.
// - A Reader[time.Time, Endomorphism[BreakerState]] that, when given the current time, produces
// an endomorphism that either:
// - Keeps the circuit closed with updated error tracking (if threshold not exceeded)
// - Opens the circuit with calculated reset time (if threshold exceeded)
//
// Thread Safety: This function creates IO operations that will atomically modify the
// IORef[BreakerState] when executed. The state modifications are thread-safe.
//
// Type signature:
//
// io.Kleisli[io.Kleisli[Endomorphism[BreakerState], BreakerState], BreakerState]
//
// State Transitions:
// - Closed -> Closed: When failure threshold is not exceeded (Some from checkClosedState)
// - Closed -> Open: When failure threshold is exceeded (None from checkClosedState)
// Thread Safety: This is a pure function that creates new state instances. The returned
// endomorphism is safe for concurrent use as it does not mutate its input.
//
// Usage Context:
// - Called when a request fails while the circuit is closed
// - Records the failure in the ClosedState (counter or history)
// - May trigger transition to open state if threshold is exceeded
// - Called after a failed request completes while the circuit is closed
// - Implements the core circuit breaker logic for opening the circuit
// - Determines when to stop allowing requests through to protect the failing service
// - Critical for preventing cascading failures in distributed systems
//
// State Transition:
// - Closed (under threshold) -> Closed (with incremented error count)
// - Closed (at/over threshold) -> Open (with reset time for recovery attempt)
func handleFailureOnClosed(
currentTime IO[time.Time],
addError Reader[time.Time, Endomorphism[ClosedState]],
checkClosedState Reader[time.Time, option.Kleisli[ClosedState, ClosedState]],
openCircuit Reader[time.Time, openState],
) io.Kleisli[io.Kleisli[Endomorphism[BreakerState], BreakerState], BreakerState] {
return F.Flow2(
io.Chain,
identity.Flap[IO[BreakerState]](F.Pipe1(
currentTime,
io.Map(func(ct time.Time) either.Operator[openState, ClosedState, ClosedState] {
return either.Chain(F.Flow3(
addError(ct),
checkClosedState(ct),
option.Fold(
F.Pipe2(
ct,
lazy.Of,
lazy.Map(F.Flow2(
openCircuit,
createOpenCircuit,
)),
),
createClosedCircuit,
),
))
}))),
) Reader[time.Time, Endomorphism[BreakerState]] {
return F.Pipe2(
F.Pipe1(
addError,
reader.ApS(reader.Map[ClosedState], checkClosedState),
),
reader.Chain(F.Flow2(
reader.Map[ClosedState](option.Fold(
F.Pipe2(
openCircuit,
reader.Map[time.Time](createOpenCircuit),
lazy.Of,
),
F.Flow2(
createClosedCircuit,
reader.Of[time.Time],
),
)),
reader.Sequence,
)),
reader.Map[time.Time](either.Chain[openState, ClosedState, ClosedState]),
)
}
func handleErrorOnClosed2[E any](
checkError option.Kleisli[E, E],
onSuccess Reader[time.Time, Endomorphism[BreakerState]],
onFailure Reader[time.Time, Endomorphism[BreakerState]],
) reader.Kleisli[time.Time, E, Endomorphism[BreakerState]] {
return F.Flow3(
checkError,
option.MapTo[E](onFailure),
option.GetOrElse(lazy.Of(onSuccess)),
)
}
func stateModifier(
modify io.Kleisli[Endomorphism[BreakerState], BreakerState],
) reader.Operator[time.Time, Endomorphism[BreakerState], IO[BreakerState]] {
return reader.Map[time.Time](modify)
}
func reportOnClose2(
onClosed ReaderIO[time.Time, Void],
onOpened ReaderIO[time.Time, Void],
) readerio.Operator[time.Time, BreakerState, Void] {
return readerio.Chain(either.Fold(
reader.Of[openState](onOpened),
reader.Of[ClosedState](onClosed),
))
}
func applyAndReportClose2(
currentTime IO[time.Time],
metrics readerio.Operator[time.Time, BreakerState, Void],
) func(io.Kleisli[Endomorphism[BreakerState], BreakerState]) func(Reader[time.Time, Endomorphism[BreakerState]]) IO[Void] {
return func(modify io.Kleisli[Endomorphism[BreakerState], BreakerState]) func(Reader[time.Time, Endomorphism[BreakerState]]) IO[Void] {
return F.Flow3(
reader.Map[time.Time](modify),
metrics,
readerio.ReadIO[Void](currentTime),
)
}
}
// MakeCircuitBreaker creates a circuit breaker implementation for a higher-kinded type.
@@ -402,6 +432,8 @@ func MakeCircuitBreaker[E, T, HKTT, HKTOP, HKTHKTT any](
chainFirstIOK func(io.Kleisli[T, BreakerState]) func(HKTT) HKTT,
chainFirstLeftIOK func(io.Kleisli[E, BreakerState]) func(HKTT) HKTT,
chainFirstIOK2 func(io.Kleisli[Either[E, T], Void]) func(HKTT) HKTT,
fromIO func(IO[func(HKTT) HKTT]) HKTOP,
flap func(HKTT) func(HKTOP) HKTHKTT,
flatten func(HKTHKTT) HKTT,
@@ -437,47 +469,22 @@ func MakeCircuitBreaker[E, T, HKTT, HKTOP, HKTHKTT any](
reader.Of[HKTT],
)
handleSuccess := handleSuccessOnClosed(currentTime, addSuccess)
handleFailure := handleFailureOnClosed(currentTime, addError, checkClosedState, openCircuit)
handleSuccess2 := handleSuccessOnClosed(addSuccess)
handleFailure2 := handleFailureOnClosed(addError, checkClosedState, openCircuit)
handleError2 := handleErrorOnClosed2(checkError, handleSuccess2, handleFailure2)
metricsClose2 := reportOnClose2(metrics.Accept, metrics.Open)
apply2 := applyAndReportClose2(currentTime, metricsClose2)
onClosed := func(modify io.Kleisli[Endomorphism[BreakerState], BreakerState]) Operator {
return F.Flow2(
// error case
chainFirstLeftIOK(F.Flow3(
checkError,
option.Fold(
// the error is not applicable, handle as success
F.Pipe2(
modify,
handleSuccess,
lazy.Of,
),
// the error is relevant, record it
F.Pipe2(
modify,
handleFailure,
reader.Of[E],
),
),
// metering
io.ChainFirst(either.Fold(
F.Flow2(
openedAtLens.Get,
metrics.Open,
),
func(c ClosedState) IO[Void] {
return io.Of(function.VOID)
},
)),
)),
// good case
chainFirstIOK(F.Pipe2(
modify,
handleSuccess,
reader.Of[T],
)),
)
return chainFirstIOK2(F.Flow2(
either.Fold(
handleError2,
reader.Of[T](handleSuccess2),
),
apply2(modify),
))
}
onCanary := func(modify io.Kleisli[Endomorphism[BreakerState], BreakerState]) Operator {

View File

@@ -5,12 +5,12 @@ import (
"testing"
"time"
"github.com/IBM/fp-go/v2/either"
"github.com/IBM/fp-go/v2/function"
F "github.com/IBM/fp-go/v2/function"
"github.com/IBM/fp-go/v2/io"
"github.com/IBM/fp-go/v2/ioref"
"github.com/IBM/fp-go/v2/option"
"github.com/IBM/fp-go/v2/reader"
"github.com/IBM/fp-go/v2/retry"
"github.com/stretchr/testify/assert"
)
@@ -452,43 +452,128 @@ func TestIsResetTimeExceeded(t *testing.T) {
// TestHandleSuccessOnClosed tests the handleSuccessOnClosed function
func TestHandleSuccessOnClosed(t *testing.T) {
t.Run("resets failure count on success", func(t *testing.T) {
t.Run("updates closed state with success when circuit is closed", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now
addSuccess := reader.From1(ClosedState.AddSuccess)
currentTime := vt.Now()
// Create initial state with some failures
now := vt.Now()
// Create a simple addSuccess reader that increments a counter
addSuccess := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddSuccess(ct)
}
}
// Create initial closed state
initialClosed := MakeClosedStateCounter(3)
initialClosed = initialClosed.AddError(now)
initialClosed = initialClosed.AddError(now)
initialState := createClosedCircuit(initialClosed)
ref := io.Run(ioref.MakeIORef(initialState))
modify := modifyV(ref)
// Apply handleSuccessOnClosed
handler := handleSuccessOnClosed(addSuccess)
endomorphism := handler(currentTime)
result := endomorphism(initialState)
handler := handleSuccessOnClosed(currentTime, addSuccess)
// Verify the state is still closed
assert.True(t, IsClosed(result), "state should remain closed after success")
// Apply the handler
result := io.Run(handler(modify))
// Verify state is still closed and failures are reset
assert.True(t, IsClosed(result), "circuit should remain closed after success")
// Verify the closed state was updated
closedState := either.Fold(
func(openState) ClosedState { return initialClosed },
F.Identity[ClosedState],
)(result)
// The success should have been recorded (implementation-specific verification)
assert.NotNil(t, closedState, "closed state should be present")
})
t.Run("keeps circuit closed", func(t *testing.T) {
t.Run("does not affect open state", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now
addSuccess := reader.From1(ClosedState.AddSuccess)
currentTime := vt.Now()
initialState := createClosedCircuit(MakeClosedStateCounter(3))
ref := io.Run(ioref.MakeIORef(initialState))
modify := modifyV(ref)
addSuccess := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddSuccess(ct)
}
}
handler := handleSuccessOnClosed(currentTime, addSuccess)
result := io.Run(handler(modify))
// Create initial open state
initialOpen := openState{
openedAt: currentTime.Add(-1 * time.Minute),
resetAt: currentTime.Add(1 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
initialState := createOpenCircuit(initialOpen)
assert.True(t, IsClosed(result), "circuit should remain closed")
// Apply handleSuccessOnClosed
handler := handleSuccessOnClosed(addSuccess)
endomorphism := handler(currentTime)
result := endomorphism(initialState)
// Verify the state remains open and unchanged
assert.True(t, IsOpen(result), "state should remain open")
// Extract and verify the open state is unchanged
openResult := either.Fold(
func(os openState) openState { return os },
func(ClosedState) openState { return initialOpen },
)(result)
assert.Equal(t, initialOpen.openedAt, openResult.openedAt, "openedAt should be unchanged")
assert.Equal(t, initialOpen.resetAt, openResult.resetAt, "resetAt should be unchanged")
assert.Equal(t, initialOpen.canaryRequest, openResult.canaryRequest, "canaryRequest should be unchanged")
})
t.Run("preserves time parameter through reader", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
time1 := vt.Now()
vt.Advance(1 * time.Hour)
time2 := vt.Now()
var capturedTime time.Time
addSuccess := func(ct time.Time) Endomorphism[ClosedState] {
capturedTime = ct
return F.Identity[ClosedState]
}
initialClosed := MakeClosedStateCounter(3)
initialState := createClosedCircuit(initialClosed)
handler := handleSuccessOnClosed(addSuccess)
// Apply with time1
endomorphism1 := handler(time1)
endomorphism1(initialState)
assert.Equal(t, time1, capturedTime, "should pass time1 to addSuccess")
// Apply with time2
endomorphism2 := handler(time2)
endomorphism2(initialState)
assert.Equal(t, time2, capturedTime, "should pass time2 to addSuccess")
})
t.Run("composes correctly with multiple successes", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now()
addSuccess := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddSuccess(ct)
}
}
initialClosed := MakeClosedStateCounter(3)
initialState := createClosedCircuit(initialClosed)
handler := handleSuccessOnClosed(addSuccess)
endomorphism := handler(currentTime)
// Apply multiple times
result1 := endomorphism(initialState)
result2 := endomorphism(result1)
result3 := endomorphism(result2)
// All should remain closed
assert.True(t, IsClosed(result1), "state should remain closed after first success")
assert.True(t, IsClosed(result2), "state should remain closed after second success")
assert.True(t, IsClosed(result3), "state should remain closed after third success")
})
}
@@ -496,9 +581,26 @@ func TestHandleSuccessOnClosed(t *testing.T) {
func TestHandleFailureOnClosed(t *testing.T) {
t.Run("keeps circuit closed when threshold not exceeded", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now
addError := reader.From1(ClosedState.AddError)
checkClosedState := reader.From1(ClosedState.Check)
currentTime := vt.Now()
// Create a closed state that allows 3 errors
initialClosed := MakeClosedStateCounter(3)
// addError increments error count
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
// checkClosedState returns Some if under threshold
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
// openCircuit creates an open state (shouldn't be called in this test)
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
@@ -508,26 +610,39 @@ func TestHandleFailureOnClosed(t *testing.T) {
}
}
// Create initial state with room for more failures
now := vt.Now()
initialClosed := MakeClosedStateCounter(5) // threshold is 5
initialClosed = initialClosed.AddError(now)
initialState := createClosedCircuit(initialClosed)
ref := io.Run(ioref.MakeIORef(initialState))
modify := modifyV(ref)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
handler := handleFailureOnClosed(currentTime, addError, checkClosedState, openCircuit)
result := io.Run(handler(modify))
// First error - should stay closed
result1 := endomorphism(initialState)
assert.True(t, IsClosed(result1), "circuit should remain closed after first error")
assert.True(t, IsClosed(result), "circuit should remain closed when threshold not exceeded")
// Second error - should stay closed
result2 := endomorphism(result1)
assert.True(t, IsClosed(result2), "circuit should remain closed after second error")
})
t.Run("opens circuit when threshold exceeded", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now
addError := reader.From1(ClosedState.AddError)
checkClosedState := reader.From1(ClosedState.Check)
currentTime := vt.Now()
// Create a closed state that allows only 2 errors (opens at 2nd error)
initialClosed := MakeClosedStateCounter(2)
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
@@ -537,26 +652,85 @@ func TestHandleFailureOnClosed(t *testing.T) {
}
}
// Create initial state at threshold
now := vt.Now()
initialClosed := MakeClosedStateCounter(2) // threshold is 2
initialClosed = initialClosed.AddError(now)
initialState := createClosedCircuit(initialClosed)
ref := io.Run(ioref.MakeIORef(initialState))
modify := modifyV(ref)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
handler := handleFailureOnClosed(currentTime, addError, checkClosedState, openCircuit)
result := io.Run(handler(modify))
// First error - should stay closed (count=1, threshold=2)
result1 := endomorphism(initialState)
assert.True(t, IsClosed(result1), "circuit should remain closed after first error")
assert.True(t, IsOpen(result), "circuit should open when threshold exceeded")
// Second error - should open (count=2, threshold=2)
result2 := endomorphism(result1)
assert.True(t, IsOpen(result2), "circuit should open when threshold reached")
})
t.Run("records failure in closed state", func(t *testing.T) {
t.Run("creates open state with correct reset time", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now
addError := reader.From1(ClosedState.AddError)
checkClosedState := reader.From1(ClosedState.Check)
currentTime := vt.Now()
expectedResetTime := currentTime.Add(5 * time.Minute)
initialClosed := MakeClosedStateCounter(1) // Opens at 1st error
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
resetAt: expectedResetTime,
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
}
initialState := createClosedCircuit(initialClosed)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
// First error - should open immediately (threshold=1)
result1 := endomorphism(initialState)
assert.True(t, IsOpen(result1), "circuit should open after first error")
// Verify the open state has correct reset time
resultOpen := either.Fold(
func(os openState) openState { return os },
func(ClosedState) openState { return openState{} },
)(result1)
assert.Equal(t, expectedResetTime, resultOpen.resetAt, "reset time should match expected")
assert.Equal(t, currentTime, resultOpen.openedAt, "opened time should be current time")
})
t.Run("edge case: zero error threshold", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now()
// Create a closed state that allows 0 errors (opens immediately)
initialClosed := MakeClosedStateCounter(0)
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
@@ -566,14 +740,212 @@ func TestHandleFailureOnClosed(t *testing.T) {
}
}
initialState := createClosedCircuit(MakeClosedStateCounter(10))
ref := io.Run(ioref.MakeIORef(initialState))
modify := modifyV(ref)
initialState := createClosedCircuit(initialClosed)
handler := handleFailureOnClosed(currentTime, addError, checkClosedState, openCircuit)
result := io.Run(handler(modify))
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
// Should still be closed but with failure recorded
assert.True(t, IsClosed(result), "circuit should remain closed")
// First error should immediately open the circuit
result := endomorphism(initialState)
assert.True(t, IsOpen(result), "circuit should open immediately with zero threshold")
})
t.Run("edge case: very high error threshold", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now()
// Create a closed state that allows 1000 errors
initialClosed := MakeClosedStateCounter(1000)
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
resetAt: ct.Add(1 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
}
initialState := createClosedCircuit(initialClosed)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
// Apply many errors
result := initialState
for i := 0; i < 100; i++ {
result = endomorphism(result)
}
// Should still be closed after 100 errors
assert.True(t, IsClosed(result), "circuit should remain closed with high threshold")
})
t.Run("preserves time parameter through reader chain", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
time1 := vt.Now()
vt.Advance(2 * time.Hour)
time2 := vt.Now()
var capturedAddErrorTime, capturedCheckTime, capturedOpenTime time.Time
initialClosed := MakeClosedStateCounter(2) // Need 2 errors to open
addError := func(ct time.Time) Endomorphism[ClosedState] {
capturedAddErrorTime = ct
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
capturedCheckTime = ct
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
capturedOpenTime = ct
return openState{
openedAt: ct,
resetAt: ct.Add(1 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
}
initialState := createClosedCircuit(initialClosed)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
// Apply with time1 - first error, stays closed
endomorphism1 := handler(time1)
result1 := endomorphism1(initialState)
assert.Equal(t, time1, capturedAddErrorTime, "addError should receive time1")
assert.Equal(t, time1, capturedCheckTime, "checkClosedState should receive time1")
// Apply with time2 - second error, should trigger open
endomorphism2 := handler(time2)
endomorphism2(result1)
assert.Equal(t, time2, capturedAddErrorTime, "addError should receive time2")
assert.Equal(t, time2, capturedCheckTime, "checkClosedState should receive time2")
assert.Equal(t, time2, capturedOpenTime, "openCircuit should receive time2")
})
t.Run("handles transition from closed to open correctly", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now()
initialClosed := MakeClosedStateCounter(2) // Opens at 2nd error
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
resetAt: ct.Add(1 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
}
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
// Start with closed state
state := createClosedCircuit(initialClosed)
assert.True(t, IsClosed(state), "initial state should be closed")
// First error - should stay closed (count=1, threshold=2)
state = endomorphism(state)
assert.True(t, IsClosed(state), "should remain closed after first error")
// Second error - should open (count=2, threshold=2)
state = endomorphism(state)
assert.True(t, IsOpen(state), "should open after second error")
// Verify it's truly open with correct properties
resultOpen := either.Fold(
func(os openState) openState { return os },
func(ClosedState) openState { return openState{} },
)(state)
assert.False(t, resultOpen.canaryRequest, "canaryRequest should be false initially")
assert.Equal(t, currentTime, resultOpen.openedAt, "openedAt should be current time")
})
t.Run("does not affect already open state", func(t *testing.T) {
vt := NewVirtualTimer(time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC))
currentTime := vt.Now()
addError := func(ct time.Time) Endomorphism[ClosedState] {
return func(cs ClosedState) ClosedState {
return cs.AddError(ct)
}
}
checkClosedState := func(ct time.Time) option.Kleisli[ClosedState, ClosedState] {
return func(cs ClosedState) Option[ClosedState] {
return cs.Check(ct)
}
}
openCircuit := func(ct time.Time) openState {
return openState{
openedAt: ct,
resetAt: ct.Add(1 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: false,
}
}
// Start with an already open state
existingOpen := openState{
openedAt: currentTime.Add(-5 * time.Minute),
resetAt: currentTime.Add(5 * time.Minute),
retryStatus: retry.DefaultRetryStatus,
canaryRequest: true,
}
initialState := createOpenCircuit(existingOpen)
handler := handleFailureOnClosed(addError, checkClosedState, openCircuit)
endomorphism := handler(currentTime)
// Apply to open state - should not change it
result := endomorphism(initialState)
assert.True(t, IsOpen(result), "state should remain open")
// The open state should be unchanged since handleFailureOnClosed
// only operates on the Right (closed) side of the Either
openResult := either.Fold(
func(os openState) openState { return os },
func(ClosedState) openState { return openState{} },
)(result)
assert.Equal(t, existingOpen.openedAt, openResult.openedAt, "openedAt should be unchanged")
assert.Equal(t, existingOpen.resetAt, openResult.resetAt, "resetAt should be unchanged")
assert.Equal(t, existingOpen.canaryRequest, openResult.canaryRequest, "canaryRequest should be unchanged")
})
}

View File

@@ -28,7 +28,10 @@ import (
//
// Thread Safety: This type is immutable and safe for concurrent use.
type CircuitBreakerError struct {
Name string
// Name: The name identifying this circuit breaker instance
Name string
// ResetAt: The time at which the circuit breaker will transition from open to half-open state
ResetAt time.Time
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/IBM/fp-go/v2/function"
"github.com/IBM/fp-go/v2/io"
)
type (
@@ -110,6 +111,25 @@ type (
name string
logger *log.Logger
}
// voidMetrics is a no-op implementation of the Metrics interface that does nothing.
// All methods return the same pre-allocated IO[Void] operation that immediately returns
// without performing any action.
//
// This implementation is useful for:
// - Testing scenarios where metrics collection is not needed
// - Production environments where metrics overhead should be eliminated
// - Benchmarking circuit breaker logic without metrics interference
// - Default initialization when no metrics implementation is provided
//
// Thread Safety: This implementation is safe for concurrent use. The noop IO operation
// is immutable and can be safely shared across goroutines.
//
// Performance: This is the most efficient Metrics implementation as it performs no
// operations and has minimal memory overhead (single shared IO[Void] instance).
voidMetrics struct {
noop IO[Void]
}
)
// doLog is a helper method that creates an IO operation for logging a circuit breaker event.
@@ -206,3 +226,79 @@ func (m *loggingMetrics) Canary(ct time.Time) IO[Void] {
func MakeMetricsFromLogger(name string, logger *log.Logger) Metrics {
return &loggingMetrics{name: name, logger: logger}
}
// Open implements the Metrics interface for voidMetrics.
// Returns a no-op IO operation that does nothing.
//
// Thread Safety: Safe for concurrent use.
func (m *voidMetrics) Open(_ time.Time) IO[Void] {
return m.noop
}
// Accept implements the Metrics interface for voidMetrics.
// Returns a no-op IO operation that does nothing.
//
// Thread Safety: Safe for concurrent use.
func (m *voidMetrics) Accept(_ time.Time) IO[Void] {
return m.noop
}
// Canary implements the Metrics interface for voidMetrics.
// Returns a no-op IO operation that does nothing.
//
// Thread Safety: Safe for concurrent use.
func (m *voidMetrics) Canary(_ time.Time) IO[Void] {
return m.noop
}
// Close implements the Metrics interface for voidMetrics.
// Returns a no-op IO operation that does nothing.
//
// Thread Safety: Safe for concurrent use.
func (m *voidMetrics) Close(_ time.Time) IO[Void] {
return m.noop
}
// Reject implements the Metrics interface for voidMetrics.
// Returns a no-op IO operation that does nothing.
//
// Thread Safety: Safe for concurrent use.
func (m *voidMetrics) Reject(_ time.Time) IO[Void] {
return m.noop
}
// MakeVoidMetrics creates a no-op Metrics implementation that performs no operations.
// All methods return the same pre-allocated IO[Void] operation that does nothing when executed.
//
// This is useful for:
// - Testing scenarios where metrics collection is not needed
// - Production environments where metrics overhead should be eliminated
// - Benchmarking circuit breaker logic without metrics interference
// - Default initialization when no metrics implementation is provided
//
// Returns:
// - Metrics: A thread-safe no-op Metrics implementation
//
// Thread Safety: The returned Metrics implementation is safe for concurrent use.
// All methods return the same immutable IO[Void] operation.
//
// Performance: This is the most efficient Metrics implementation with minimal overhead.
// The IO[Void] operation is pre-allocated once and reused for all method calls.
//
// Example:
//
// metrics := MakeVoidMetrics()
//
// // All operations do nothing
// io.Run(metrics.Open(time.Now())) // No-op
// io.Run(metrics.Accept(time.Now())) // No-op
// io.Run(metrics.Reject(time.Now())) // No-op
//
// // Useful for testing
// breaker := MakeCircuitBreaker(
// // ... other parameters ...
// MakeVoidMetrics(), // No metrics overhead
// )
func MakeVoidMetrics() Metrics {
return &voidMetrics{io.Of(function.VOID)}
}

View File

@@ -504,3 +504,443 @@ func TestMetricsIOOperations(t *testing.T) {
assert.Len(t, lines, 3, "should execute multiple times")
})
}
// TestMakeVoidMetrics tests the MakeVoidMetrics constructor
func TestMakeVoidMetrics(t *testing.T) {
t.Run("creates valid Metrics implementation", func(t *testing.T) {
metrics := MakeVoidMetrics()
assert.NotNil(t, metrics, "MakeVoidMetrics should return non-nil Metrics")
})
t.Run("returns voidMetrics type", func(t *testing.T) {
metrics := MakeVoidMetrics()
_, ok := metrics.(*voidMetrics)
assert.True(t, ok, "should return *voidMetrics type")
})
t.Run("initializes noop IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics().(*voidMetrics)
assert.NotNil(t, metrics.noop, "noop IO operation should be initialized")
})
}
// TestVoidMetricsAccept tests the Accept method of voidMetrics
func TestVoidMetricsAccept(t *testing.T) {
t.Run("returns non-nil IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Accept(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
})
t.Run("IO operation executes without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Accept(timestamp)
result := io.Run(ioOp)
assert.NotNil(t, result, "IO operation should execute successfully")
})
t.Run("returns same IO operation instance", func(t *testing.T) {
metrics := MakeVoidMetrics().(*voidMetrics)
timestamp := time.Now()
ioOp1 := metrics.Accept(timestamp)
ioOp2 := metrics.Accept(timestamp)
// Both should be non-nil (we can't compare functions directly in Go)
assert.NotNil(t, ioOp1, "should return non-nil IO operation")
assert.NotNil(t, ioOp2, "should return non-nil IO operation")
// Verify they execute without error
io.Run(ioOp1)
io.Run(ioOp2)
})
t.Run("ignores timestamp parameter", func(t *testing.T) {
metrics := MakeVoidMetrics()
time1 := time.Date(2026, 1, 9, 15, 30, 0, 0, time.UTC)
time2 := time.Date(2026, 1, 9, 16, 30, 0, 0, time.UTC)
ioOp1 := metrics.Accept(time1)
ioOp2 := metrics.Accept(time2)
// Should return same operation regardless of timestamp
io.Run(ioOp1)
io.Run(ioOp2)
// No assertions needed - just verify it doesn't panic
})
}
// TestVoidMetricsReject tests the Reject method of voidMetrics
func TestVoidMetricsReject(t *testing.T) {
t.Run("returns non-nil IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Reject(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
})
t.Run("IO operation executes without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Reject(timestamp)
result := io.Run(ioOp)
assert.NotNil(t, result, "IO operation should execute successfully")
})
t.Run("returns same IO operation instance", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Reject(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
io.Run(ioOp) // Verify it executes without error
})
}
// TestVoidMetricsOpen tests the Open method of voidMetrics
func TestVoidMetricsOpen(t *testing.T) {
t.Run("returns non-nil IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Open(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
})
t.Run("IO operation executes without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Open(timestamp)
result := io.Run(ioOp)
assert.NotNil(t, result, "IO operation should execute successfully")
})
t.Run("returns same IO operation instance", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Open(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
io.Run(ioOp) // Verify it executes without error
})
}
// TestVoidMetricsClose tests the Close method of voidMetrics
func TestVoidMetricsClose(t *testing.T) {
t.Run("returns non-nil IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Close(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
})
t.Run("IO operation executes without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Close(timestamp)
result := io.Run(ioOp)
assert.NotNil(t, result, "IO operation should execute successfully")
})
t.Run("returns same IO operation instance", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Close(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
io.Run(ioOp) // Verify it executes without error
})
}
// TestVoidMetricsCanary tests the Canary method of voidMetrics
func TestVoidMetricsCanary(t *testing.T) {
t.Run("returns non-nil IO operation", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Canary(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
})
t.Run("IO operation executes without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Canary(timestamp)
result := io.Run(ioOp)
assert.NotNil(t, result, "IO operation should execute successfully")
})
t.Run("returns same IO operation instance", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Canary(timestamp)
assert.NotNil(t, ioOp, "should return non-nil IO operation")
io.Run(ioOp) // Verify it executes without error
})
}
// TestVoidMetricsThreadSafety tests concurrent access to voidMetrics
func TestVoidMetricsThreadSafety(t *testing.T) {
t.Run("handles concurrent metric calls", func(t *testing.T) {
metrics := MakeVoidMetrics()
var wg sync.WaitGroup
numGoroutines := 100
wg.Add(numGoroutines * 5) // 5 methods
timestamp := time.Now()
// Launch multiple goroutines calling all methods concurrently
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
io.Run(metrics.Accept(timestamp))
}()
go func() {
defer wg.Done()
io.Run(metrics.Reject(timestamp))
}()
go func() {
defer wg.Done()
io.Run(metrics.Open(timestamp))
}()
go func() {
defer wg.Done()
io.Run(metrics.Close(timestamp))
}()
go func() {
defer wg.Done()
io.Run(metrics.Canary(timestamp))
}()
}
wg.Wait()
// Test passes if no panic occurs
})
t.Run("all methods return valid IO operations concurrently", func(t *testing.T) {
metrics := MakeVoidMetrics()
var wg sync.WaitGroup
numGoroutines := 50
wg.Add(numGoroutines)
timestamp := time.Now()
results := make([]IO[Void], numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(idx int) {
defer wg.Done()
// Each goroutine calls a different method
switch idx % 5 {
case 0:
results[idx] = metrics.Accept(timestamp)
case 1:
results[idx] = metrics.Reject(timestamp)
case 2:
results[idx] = metrics.Open(timestamp)
case 3:
results[idx] = metrics.Close(timestamp)
case 4:
results[idx] = metrics.Canary(timestamp)
}
}(i)
}
wg.Wait()
// All results should be non-nil and executable
for i, result := range results {
assert.NotNil(t, result, "result %d should be non-nil", i)
io.Run(result) // Verify it executes without error
}
})
}
// TestVoidMetricsPerformance tests performance characteristics
func TestVoidMetricsPerformance(t *testing.T) {
t.Run("has minimal overhead", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
// Execute many operations quickly
iterations := 10000
for i := 0; i < iterations; i++ {
io.Run(metrics.Accept(timestamp))
io.Run(metrics.Reject(timestamp))
io.Run(metrics.Open(timestamp))
io.Run(metrics.Close(timestamp))
io.Run(metrics.Canary(timestamp))
}
// Test passes if it completes quickly without issues
})
t.Run("all methods return valid IO operations", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
// All methods should return non-nil IO operations
accept := metrics.Accept(timestamp)
reject := metrics.Reject(timestamp)
open := metrics.Open(timestamp)
close := metrics.Close(timestamp)
canary := metrics.Canary(timestamp)
assert.NotNil(t, accept, "Accept should return non-nil")
assert.NotNil(t, reject, "Reject should return non-nil")
assert.NotNil(t, open, "Open should return non-nil")
assert.NotNil(t, close, "Close should return non-nil")
assert.NotNil(t, canary, "Canary should return non-nil")
// All should execute without error
io.Run(accept)
io.Run(reject)
io.Run(open)
io.Run(close)
io.Run(canary)
})
}
// TestVoidMetricsIntegration tests integration scenarios
func TestVoidMetricsIntegration(t *testing.T) {
t.Run("can be used as drop-in replacement for loggingMetrics", func(t *testing.T) {
// Create both types of metrics
var buf bytes.Buffer
logger := log.New(&buf, "", 0)
loggingMetrics := MakeMetricsFromLogger("TestCircuit", logger)
voidMetrics := MakeVoidMetrics()
timestamp := time.Now()
// Both should implement the same interface
var m1 Metrics = loggingMetrics
var m2 Metrics = voidMetrics
// Both should be callable
io.Run(m1.Accept(timestamp))
io.Run(m2.Accept(timestamp))
// Logging metrics should have output
assert.NotEmpty(t, buf.String(), "logging metrics should produce output")
// Void metrics should have no observable side effects
// (we can't directly test this, but the test passes if no panic occurs)
})
t.Run("simulates complete circuit breaker lifecycle without side effects", func(t *testing.T) {
metrics := MakeVoidMetrics()
baseTime := time.Date(2026, 1, 9, 15, 30, 0, 0, time.UTC)
// Simulate circuit breaker lifecycle - all should be no-ops
io.Run(metrics.Accept(baseTime))
io.Run(metrics.Accept(baseTime.Add(1 * time.Second)))
io.Run(metrics.Open(baseTime.Add(2 * time.Second)))
io.Run(metrics.Reject(baseTime.Add(3 * time.Second)))
io.Run(metrics.Canary(baseTime.Add(30 * time.Second)))
io.Run(metrics.Close(baseTime.Add(31 * time.Second)))
// Test passes if no panic occurs and completes quickly
})
}
// TestVoidMetricsEdgeCases tests edge cases
func TestVoidMetricsEdgeCases(t *testing.T) {
t.Run("handles zero time", func(t *testing.T) {
metrics := MakeVoidMetrics()
zeroTime := time.Time{}
io.Run(metrics.Accept(zeroTime))
io.Run(metrics.Reject(zeroTime))
io.Run(metrics.Open(zeroTime))
io.Run(metrics.Close(zeroTime))
io.Run(metrics.Canary(zeroTime))
// Test passes if no panic occurs
})
t.Run("handles far future time", func(t *testing.T) {
metrics := MakeVoidMetrics()
futureTime := time.Date(9999, 12, 31, 23, 59, 59, 0, time.UTC)
io.Run(metrics.Accept(futureTime))
io.Run(metrics.Reject(futureTime))
io.Run(metrics.Open(futureTime))
io.Run(metrics.Close(futureTime))
io.Run(metrics.Canary(futureTime))
// Test passes if no panic occurs
})
t.Run("IO operations are idempotent", func(t *testing.T) {
metrics := MakeVoidMetrics()
timestamp := time.Now()
ioOp := metrics.Accept(timestamp)
// Execute same operation multiple times
io.Run(ioOp)
io.Run(ioOp)
io.Run(ioOp)
// Test passes if no panic occurs
})
}
// TestMetricsComparison compares loggingMetrics and voidMetrics
func TestMetricsComparison(t *testing.T) {
t.Run("both implement Metrics interface", func(t *testing.T) {
var buf bytes.Buffer
logger := log.New(&buf, "", 0)
var m1 Metrics = MakeMetricsFromLogger("Test", logger)
var m2 Metrics = MakeVoidMetrics()
assert.NotNil(t, m1)
assert.NotNil(t, m2)
})
t.Run("voidMetrics has no observable side effects unlike loggingMetrics", func(t *testing.T) {
var buf bytes.Buffer
logger := log.New(&buf, "", 0)
loggingMetrics := MakeMetricsFromLogger("Test", logger)
voidMetrics := MakeVoidMetrics()
timestamp := time.Now()
// Logging metrics produces output
io.Run(loggingMetrics.Accept(timestamp))
assert.NotEmpty(t, buf.String(), "logging metrics should produce output")
// Void metrics has no observable output
// (we can only verify it doesn't panic)
io.Run(voidMetrics.Accept(timestamp))
})
}

View File

@@ -34,6 +34,7 @@ import (
"github.com/IBM/fp-go/v2/pair"
"github.com/IBM/fp-go/v2/predicate"
"github.com/IBM/fp-go/v2/reader"
"github.com/IBM/fp-go/v2/readerio"
"github.com/IBM/fp-go/v2/retry"
"github.com/IBM/fp-go/v2/state"
)
@@ -79,10 +80,13 @@ type (
// and produces a value of type A. Used for dependency injection and configuration.
Reader[R, A any] = reader.Reader[R, A]
ReaderIO[R, A any] = readerio.ReaderIO[R, A]
// openState represents the internal state when the circuit breaker is open.
// In the open state, requests are blocked to give the failing service time to recover.
// The circuit breaker will transition to a half-open state (canary request) after resetAt.
openState struct {
// openedAt is the time when the circuit breaker opened the circuit
openedAt time.Time
// resetAt is the time when the circuit breaker should attempt a canary request

View File

@@ -4,6 +4,7 @@ import (
"time"
"github.com/IBM/fp-go/v2/circuitbreaker"
"github.com/IBM/fp-go/v2/context/readerio"
"github.com/IBM/fp-go/v2/option"
"github.com/IBM/fp-go/v2/retry"
)
@@ -27,6 +28,9 @@ func MakeCircuitBreaker[T any](
Left,
ChainFirstIOK,
ChainFirstLeftIOK,
readerio.ChainFirstIOK,
FromIO,
Flap,
Flatten,

View File

@@ -314,8 +314,8 @@ func TestExtendTypeTransformations(t *testing.T) {
t.Run("string to bool transformation", func(t *testing.T) {
isEmpty := Extend(func(e Either[error, string]) bool {
return Fold(
func(err error) bool { return true },
func(s string) bool { return len(s) == 0 },
F.Constant1[error](true),
S.IsEmpty,
)(e)
})
@@ -323,10 +323,10 @@ func TestExtendTypeTransformations(t *testing.T) {
result2 := isEmpty(Right[error]("hello"))
assert.True(t, IsRight(result1))
assert.True(t, GetOrElse(func(error) bool { return false })(result1))
assert.True(t, GetOrElse(F.Constant1[error](false))(result1))
assert.True(t, IsRight(result2))
assert.False(t, GetOrElse(func(error) bool { return true })(result2))
assert.False(t, GetOrElse(F.Constant1[error](true))(result2))
})
}
@@ -367,11 +367,9 @@ func TestExtendWithComplexTypes(t *testing.T) {
result2 := isAdult(Right[error](user2))
assert.True(t, IsRight(result1))
assert.True(t, GetOrElse(func(error) bool { return false })(result1))
assert.True(t, GetOrElse(F.Constant1[error](false))(result1))
assert.True(t, IsRight(result2))
assert.False(t, GetOrElse(func(error) bool { return true })(result2))
assert.False(t, GetOrElse(F.Constant1[error](true))(result2))
})
}
// Made with Bob

View File

@@ -318,5 +318,3 @@ func BenchmarkSemigroup_Concat(b *testing.B) {
_ = personOrd.Compare(p1, p2)
}
}
// Made with Bob

View File

@@ -57,5 +57,3 @@ type (
// personOrd := ageOperator(intOrd)
Operator[A, B any] = Kleisli[Ord[A], B]
)
// Made with Bob

View File

@@ -201,5 +201,3 @@ func ExampleOperator() {
result := personOrd.Compare(p1, p2)
println(result) // 1 (30 > 25)
}
// Made with Bob

View File

@@ -60,6 +60,8 @@ import (
// - You need to partially apply environments in a different order
// - You're composing functions that expect parameters in reverse order
// - You want to curry multi-parameter functions differently
//
//go:inline
func Sequence[R1, R2, A any](ma Reader[R2, Reader[R1, A]]) Kleisli[R2, R1, A] {
return function.Flip(ma)
}

View File

@@ -249,6 +249,34 @@ func MonadChain[R, A, B any](ma Reader[R, A], f Kleisli[R, A, B]) Reader[R, B] {
// Chain sequences two Reader computations where the second depends on the result of the first.
// This is the Monad operation that enables dependent computations.
//
// Relationship with Compose:
//
// Chain and Compose serve different purposes in Reader composition:
//
// - Chain: Monadic composition - sequences Readers that share the SAME environment type.
// The second Reader depends on the VALUE produced by the first Reader, but both
// Readers receive the same environment R. This is the monadic bind (>>=) operation.
// Signature: Chain[R, A, B](f: A -> Reader[R, B]) -> Reader[R, A] -> Reader[R, B]
//
// - Compose: Function composition - chains Readers where the OUTPUT of the first
// becomes the INPUT environment of the second. The environment types can differ.
// This is standard function composition (.) for Readers as functions.
// Signature: Compose[C, R, B](ab: Reader[R, B]) -> Reader[B, C] -> Reader[R, C]
//
// Key Differences:
//
// 1. Environment handling:
// - Chain: Both Readers use the same environment R
// - Compose: First Reader's output B becomes second Reader's input environment
//
// 2. Data flow:
// - Chain: R -> A, then A -> Reader[R, B], both using same R
// - Compose: R -> B, then B -> C (B is both output and environment)
//
// 3. Use cases:
// - Chain: Dependent computations in the same context (e.g., fetch user, then fetch user's posts)
// - Compose: Transforming nested environments (e.g., extract config from app state, then read from config)
//
// Example:
//
// type Config struct { UserId int }
@@ -360,6 +388,53 @@ func Flatten[R, A any](mma Reader[R, Reader[R, A]]) Reader[R, A] {
// Compose composes two Readers sequentially, where the output environment of the first
// becomes the input environment of the second.
//
// Relationship with Chain:
//
// Compose and Chain serve different purposes in Reader composition:
//
// - Compose: Function composition - chains Readers where the OUTPUT of the first
// becomes the INPUT environment of the second. The environment types can differ.
// This is standard function composition (.) for Readers as functions.
// Signature: Compose[C, R, B](ab: Reader[R, B]) -> Reader[B, C] -> Reader[R, C]
//
// - Chain: Monadic composition - sequences Readers that share the SAME environment type.
// The second Reader depends on the VALUE produced by the first Reader, but both
// Readers receive the same environment R. This is the monadic bind (>>=) operation.
// Signature: Chain[R, A, B](f: A -> Reader[R, B]) -> Reader[R, A] -> Reader[R, B]
//
// Key Differences:
//
// 1. Environment handling:
// - Compose: First Reader's output B becomes second Reader's input environment
// - Chain: Both Readers use the same environment R
//
// 2. Data flow:
// - Compose: R -> B, then B -> C (B is both output and environment)
// - Chain: R -> A, then A -> Reader[R, B], both using same R
//
// 3. Use cases:
// - Compose: Transforming nested environments (e.g., extract config from app state, then read from config)
// - Chain: Dependent computations in the same context (e.g., fetch user, then fetch user's posts)
//
// Visual Comparison:
//
// // Compose: Environment transformation
// type AppState struct { Config Config }
// type Config struct { Port int }
// getConfig := func(s AppState) Config { return s.Config }
// getPort := func(c Config) int { return c.Port }
// getPortFromState := reader.Compose(getConfig)(getPort)
// // Flow: AppState -> Config -> int (Config is both output and next input)
//
// // Chain: Same environment, dependent values
// type Env struct { UserId int; Users map[int]string }
// getUserId := func(e Env) int { return e.UserId }
// getUser := func(id int) reader.Reader[Env, string] {
// return func(e Env) string { return e.Users[id] }
// }
// getUserName := reader.Chain(getUser)(getUserId)
// // Flow: Env -> int, then int -> Reader[Env, string] (Env used twice)
//
// Example:
//
// type Config struct { Port int }

View File

@@ -1112,6 +1112,63 @@ func Read[A, R any](r R) func(ReaderIO[R, A]) IO[A] {
return reader.Read[IO[A]](r)
}
// ReadIO executes a ReaderIO computation by providing an environment wrapped in an IO effect.
// This is useful when the environment itself needs to be computed or retrieved through side effects.
//
// The function takes an IO[R] (an effectful computation that produces an environment) and returns
// a function that can execute a ReaderIO[R, A] to produce an IO[A].
//
// This is particularly useful in scenarios where:
// - The environment needs to be loaded from a file, database, or network
// - The environment requires initialization with side effects
// - You want to compose environment retrieval with the computation that uses it
//
// The execution flow is:
// 1. Execute the IO[R] to get the environment R
// 2. Pass the environment to the ReaderIO[R, A] to get an IO[A]
// 3. Execute the resulting IO[A] to get the final result A
//
// Type Parameters:
// - A: The result type of the ReaderIO computation
// - R: The environment type required by the ReaderIO
//
// Parameters:
// - r: An IO effect that produces the environment of type R
//
// Returns:
// - A function that takes a ReaderIO[R, A] and returns an IO[A]
//
// Example:
//
// type Config struct {
// DatabaseURL string
// Port int
// }
//
// // Load config from file (side effect)
// loadConfig := io.Of(Config{DatabaseURL: "localhost:5432", Port: 8080})
//
// // A computation that uses the config
// getConnectionString := readerio.Asks(func(c Config) io.IO[string] {
// return io.Of(c.DatabaseURL)
// })
//
// // Compose them together
// result := readerio.ReadIO[string](loadConfig)(getConnectionString)
// connectionString := result() // Executes both effects and returns "localhost:5432"
//
// Comparison with Read:
// - [Read]: Takes a pure value R and executes the ReaderIO immediately
// - [ReadIO]: Takes an IO[R] and chains the effects together
//
//go:inline
func ReadIO[A, R any](r IO[R]) func(ReaderIO[R, A]) IO[A] {
return function.Flow2(
io.Chain[R, A],
Read[A](r),
)
}
// Delay creates an operation that passes in the value after some delay
//
//go:inline

View File

@@ -23,6 +23,7 @@ import (
"github.com/IBM/fp-go/v2/internal/utils"
G "github.com/IBM/fp-go/v2/io"
N "github.com/IBM/fp-go/v2/number"
S "github.com/IBM/fp-go/v2/string"
"github.com/stretchr/testify/assert"
)
@@ -697,6 +698,150 @@ func TestRead(t *testing.T) {
assert.Equal(t, 42, result)
}
func TestReadIO(t *testing.T) {
t.Run("basic usage with IO environment", func(t *testing.T) {
// Create a ReaderIO that uses the config
rio := Of[ReaderTestConfig](42)
// Create an IO that produces the config
configIO := G.Of(ReaderTestConfig{Value: 21, Name: "test"})
// Use ReadIO to execute the ReaderIO with the IO environment
result := ReadIO[int](configIO)(rio)()
assert.Equal(t, 42, result)
})
t.Run("chains IO effects correctly", func(t *testing.T) {
// Track execution order
executionOrder := []string{}
// Create an IO that produces the config with a side effect
configIO := func() ReaderTestConfig {
executionOrder = append(executionOrder, "load config")
return ReaderTestConfig{Value: 10, Name: "test"}
}
// Create a ReaderIO that uses the config with a side effect
rio := func(c ReaderTestConfig) G.IO[int] {
return func() int {
executionOrder = append(executionOrder, "use config")
return c.Value * 3
}
}
// Execute the composed computation
result := ReadIO[int](configIO)(rio)()
assert.Equal(t, 30, result)
assert.Equal(t, []string{"load config", "use config"}, executionOrder)
})
t.Run("works with complex environment loading", func(t *testing.T) {
// Simulate loading config from a file or database
loadConfigFromDB := func() ReaderTestConfig {
// Simulate side effect
return ReaderTestConfig{Value: 100, Name: "production"}
}
// A computation that depends on the loaded config
getConnectionString := func(c ReaderTestConfig) G.IO[string] {
return G.Of(c.Name + ":" + S.Format[int]("%d")(c.Value))
}
result := ReadIO[string](loadConfigFromDB)(getConnectionString)()
assert.Equal(t, "production:100", result)
})
t.Run("composes with other ReaderIO operations", func(t *testing.T) {
configIO := G.Of(ReaderTestConfig{Value: 5, Name: "test"})
// Build a pipeline using ReaderIO operations
pipeline := F.Pipe2(
Ask[ReaderTestConfig](),
Map[ReaderTestConfig](func(c ReaderTestConfig) int { return c.Value }),
Chain(func(n int) ReaderIO[ReaderTestConfig, int] {
return Of[ReaderTestConfig](n * 4)
}),
)
result := ReadIO[int](configIO)(pipeline)()
assert.Equal(t, 20, result)
})
t.Run("handles environment with multiple fields", func(t *testing.T) {
configIO := G.Of(ReaderTestConfig{Value: 42, Name: "answer"})
// Access both fields from the environment
rio := func(c ReaderTestConfig) G.IO[string] {
return G.Of(c.Name + "=" + S.Format[int]("%d")(c.Value))
}
result := ReadIO[string](configIO)(rio)()
assert.Equal(t, "answer=42", result)
})
t.Run("lazy evaluation - IO not executed until called", func(t *testing.T) {
executed := false
configIO := func() ReaderTestConfig {
executed = true
return ReaderTestConfig{Value: 1, Name: "test"}
}
rio := Of[ReaderTestConfig](42)
// Create the composed IO but don't execute it yet
composedIO := ReadIO[int](configIO)(rio)
// Config IO should not be executed yet
assert.False(t, executed)
// Now execute it
result := composedIO()
// Now it should be executed
assert.True(t, executed)
assert.Equal(t, 42, result)
})
t.Run("works with ChainIOK", func(t *testing.T) {
configIO := G.Of(ReaderTestConfig{Value: 10, Name: "test"})
pipeline := F.Pipe1(
Of[ReaderTestConfig](5),
ChainIOK[ReaderTestConfig](func(n int) G.IO[int] {
return G.Of(n * 2)
}),
)
result := ReadIO[int](configIO)(pipeline)()
assert.Equal(t, 10, result)
})
t.Run("comparison with Read - different input types", func(t *testing.T) {
rio := func(c ReaderTestConfig) G.IO[int] {
return G.Of(c.Value + 10)
}
config := ReaderTestConfig{Value: 5, Name: "test"}
// Using Read with a pure value
resultRead := Read[int](config)(rio)()
// Using ReadIO with an IO value
resultReadIO := ReadIO[int](G.Of(config))(rio)()
// Both should produce the same result
assert.Equal(t, 15, resultRead)
assert.Equal(t, 15, resultReadIO)
})
}
func TestTapWithLogging(t *testing.T) {
// Simulate logging scenario
logged := []int{}