mirror of
https://github.com/IBM/fp-go.git
synced 2025-08-10 22:31:32 +02:00
fix: add ioeither
Signed-off-by: Dr. Carsten Leue <carsten.leue@de.ibm.com>
This commit is contained in:
87
retry/generic/retry.go
Normal file
87
retry/generic/retry.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package generic
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
F "github.com/ibm/fp-go/function"
|
||||
O "github.com/ibm/fp-go/option"
|
||||
R "github.com/ibm/fp-go/retry"
|
||||
)
|
||||
|
||||
// Apply policy and delay by its amount if it results in a R.
|
||||
// Returns updated status.
|
||||
// HKTSTATUS = HKT<R.RetryStatus>
|
||||
func applyAndDelay[HKTSTATUS any](
|
||||
monadOf func(R.RetryStatus) HKTSTATUS,
|
||||
monadDelay func(time.Duration) func(HKTSTATUS) HKTSTATUS,
|
||||
) func(policy R.RetryPolicy, status R.RetryStatus) HKTSTATUS {
|
||||
return func(policy R.RetryPolicy, status R.RetryStatus) HKTSTATUS {
|
||||
newStatus := R.ApplyPolicy(policy, status)
|
||||
return F.Pipe1(
|
||||
newStatus.PreviousDelay,
|
||||
O.Fold(
|
||||
F.Nullary2(F.Constant(newStatus), monadOf),
|
||||
func(delay time.Duration) HKTSTATUS {
|
||||
return monadDelay(delay)(monadOf(newStatus))
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Retry combinator for actions that don't raise exceptions, but
|
||||
// signal in their type the outcome has failed. Examples are the
|
||||
// `Option`, `Either` and `EitherT` monads.
|
||||
//
|
||||
// policy - refers to the retry policy
|
||||
// action - converts a status into an operation to be executed
|
||||
// check - checks if the result of the action needs to be retried
|
||||
func Retrying[HKTA, HKTSTATUS, A any](
|
||||
monadChain func(func(A) HKTA) func(HKTA) HKTA,
|
||||
monadChainStatus func(func(R.RetryStatus) HKTA) func(HKTSTATUS) HKTA,
|
||||
monadOf func(A) HKTA,
|
||||
monadOfStatus func(R.RetryStatus) HKTSTATUS,
|
||||
monadDelay func(time.Duration) func(HKTSTATUS) HKTSTATUS,
|
||||
|
||||
policy R.RetryPolicy,
|
||||
action func(R.RetryStatus) HKTA,
|
||||
check func(A) bool,
|
||||
) HKTA {
|
||||
// delay callback
|
||||
applyDelay := applyAndDelay(monadOfStatus, monadDelay)
|
||||
|
||||
// function to check if we need to retry or not
|
||||
checkForRetry := O.FromPredicate(check)
|
||||
|
||||
var f func(status R.RetryStatus) HKTA
|
||||
|
||||
// need some lazy init because we reference it in the chain
|
||||
f = func(status R.RetryStatus) HKTA {
|
||||
return F.Pipe2(
|
||||
status,
|
||||
action,
|
||||
monadChain(func(a A) HKTA {
|
||||
return F.Pipe3(
|
||||
a,
|
||||
checkForRetry,
|
||||
O.Map(func(a A) HKTA {
|
||||
return F.Pipe1(
|
||||
applyDelay(policy, status),
|
||||
monadChainStatus(func(status R.RetryStatus) HKTA {
|
||||
return F.Pipe1(
|
||||
status.PreviousDelay,
|
||||
O.Fold(F.Constant(monadOf(a)), func(_ time.Duration) HKTA {
|
||||
return f(status)
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
}),
|
||||
O.GetOrElse(F.Constant(monadOf(a))),
|
||||
)
|
||||
}),
|
||||
)
|
||||
}
|
||||
// seed
|
||||
return f(R.DefaultRetryStatus)
|
||||
}
|
108
retry/retry.go
Normal file
108
retry/retry.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package Retry
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
F "github.com/ibm/fp-go/function"
|
||||
M "github.com/ibm/fp-go/monoid"
|
||||
O "github.com/ibm/fp-go/option"
|
||||
"github.com/ibm/fp-go/ord"
|
||||
)
|
||||
|
||||
type RetryStatus struct {
|
||||
// Iteration number, where `0` is the first try
|
||||
IterNumber uint
|
||||
// Delay incurred so far from retries
|
||||
CumulativeDelay time.Duration
|
||||
// Latest attempt's delay. Will always be `none` on first run.
|
||||
PreviousDelay O.Option[time.Duration]
|
||||
}
|
||||
|
||||
// RetryPolicy is a function that takes an `RetryStatus` and
|
||||
// possibly returns a delay in milliseconds. Iteration numbers start
|
||||
// at zero and increase by one on each retry. A //None// return value from
|
||||
// the function implies we have reached the retry limit.
|
||||
type RetryPolicy = func(RetryStatus) O.Option[time.Duration]
|
||||
|
||||
const emptyDuration = time.Duration(0)
|
||||
|
||||
var ordDuration = ord.FromStrictCompare[time.Duration]()
|
||||
|
||||
// 'RetryPolicy' is a 'Monoid'. You can collapse multiple strategies into one using 'concat'.
|
||||
// The semantics of this combination are as follows:
|
||||
//
|
||||
// 1. If either policy returns 'None', the combined policy returns
|
||||
// 'None'. This can be used to inhibit after a number of retries,
|
||||
// for example.
|
||||
//
|
||||
// 2. If both policies return a delay, the larger delay will be used.
|
||||
// This is quite natural when combining multiple policies to achieve a
|
||||
// certain effect.
|
||||
var Monoid = M.FunctionMonoid[RetryStatus](O.ApplicativeMonoid(M.MakeMonoid(
|
||||
ord.MaxSemigroup(ordDuration).Concat, emptyDuration)))
|
||||
|
||||
// LimitRetries retries immediately, but only up to `i` times.
|
||||
func LimitRetries(i uint) RetryPolicy {
|
||||
pred := func(value uint) bool {
|
||||
return value < i
|
||||
}
|
||||
empty := F.Constant1[uint](emptyDuration)
|
||||
return func(status RetryStatus) O.Option[time.Duration] {
|
||||
return F.Pipe2(
|
||||
status.IterNumber,
|
||||
O.FromPredicate(pred),
|
||||
O.Map(empty),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// ConstantDelay delays with unlimited retries
|
||||
func ConstantDelay(delay time.Duration) RetryPolicy {
|
||||
return F.Constant1[RetryStatus](O.Of(delay))
|
||||
}
|
||||
|
||||
// CapDelay sets a time-upperbound for any delays that may be directed by the
|
||||
// given policy. This function does not terminate the retrying. The policy
|
||||
// capDelay(maxDelay, exponentialBackoff(n))` will never stop retrying. It
|
||||
// will reach a state where it retries forever with a delay of `maxDelay`
|
||||
// between each one. To get termination you need to use one of the
|
||||
// 'limitRetries' function variants.
|
||||
func CapDelay(maxDelay time.Duration, policy RetryPolicy) RetryPolicy {
|
||||
return F.Flow2(
|
||||
policy,
|
||||
O.Map(F.Bind1st(ord.Min(ordDuration), maxDelay)),
|
||||
)
|
||||
}
|
||||
|
||||
// ExponentialBackoff grows delay exponentially each iteration.
|
||||
// Each delay will increase by a factor of two.
|
||||
func ExponentialBackoff(delay time.Duration) RetryPolicy {
|
||||
return func(status RetryStatus) O.Option[time.Duration] {
|
||||
return O.Some(delay * time.Duration(math.Pow(2, float64(status.IterNumber))))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial, default retry status. Exported mostly to allow user code
|
||||
* to test their handlers and retry policies.
|
||||
*/
|
||||
var DefaultRetryStatus = RetryStatus{
|
||||
IterNumber: 0,
|
||||
CumulativeDelay: 0,
|
||||
PreviousDelay: O.None[time.Duration](),
|
||||
}
|
||||
|
||||
var getOrElseDelay = O.GetOrElse(F.Constant(emptyDuration))
|
||||
|
||||
/**
|
||||
* Apply policy on status to see what the decision would be.
|
||||
*/
|
||||
func ApplyPolicy(policy RetryPolicy, status RetryStatus) RetryStatus {
|
||||
previousDelay := policy(status)
|
||||
return RetryStatus{
|
||||
IterNumber: status.IterNumber + 1,
|
||||
CumulativeDelay: status.CumulativeDelay + getOrElseDelay(previousDelay),
|
||||
PreviousDelay: previousDelay,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user