1
0
mirror of https://github.com/imgproxy/imgproxy.git synced 2026-06-15 22:47:11 +02:00
Files
2026-05-20 20:58:07 +02:00

82 lines
2.1 KiB
Go

package asyncbuffer
import (
"sync"
"sync/atomic"
)
type condCh = chan struct{}
// Cond signals that an event has occurred to multiple waiters.
// Uses a cursor to detect if Tick() occurred between checking a condition and waiting.
type Cond struct {
mu sync.RWMutex
ch condCh
cursor atomic.Uint64 // Incremented on each Tick
closeOnce sync.Once
}
// NewCond creates a new Cond instance with an initialized channel.
func NewCond() *Cond {
return &Cond{
ch: make(condCh),
}
}
// Tick signals that an event has occurred by closing the channel and incrementing cursor.
func (t *Cond) Tick() {
t.mu.Lock()
defer t.mu.Unlock()
if t.ch != nil {
t.cursor.Add(1) // Increment FIRST so waiters see new cursor when they wake
close(t.ch) // Then close channel to wake waiters
t.ch = make(condCh) // Create new channel for next Tick
}
}
// Cursor returns the current cursor value.
// Capture this before checking your condition, then pass it to Wait().
func (t *Cond) Cursor() uint64 {
return t.cursor.Load()
}
// Wait waits for a Tick() to occur after the given cursor value.
// Returns the new cursor value when awakened.
// If the cursor has already advanced beyond the given value, returns immediately with the current cursor.
func (t *Cond) Wait(cursor uint64) uint64 {
t.mu.RLock()
ch := t.ch
current := t.cursor.Load()
t.mu.RUnlock()
// If cursor already advanced, return immediately
if current != cursor || ch == nil {
return current
}
// Wait for the channel to close (Tick happened)
// If Tick happens after t.mu.RUnlock() and here, we will just wait on a
// closed channel and return updated cursor value.
<-ch
// NOTE: Another Tick() could happen here before we read cursor.
// This is acceptable - we return the current cursor value, which may be
// newer than the Tick() that woke us. The caller will use this cursor
// for the next wait cycle and re-check their condition.
return t.cursor.Load()
}
// Close closes the ticker channel and prevents further ticks.
func (t *Cond) Close() {
t.closeOnce.Do(func() {
t.mu.Lock()
defer t.mu.Unlock()
if t.ch != nil {
close(t.ch)
t.ch = nil
}
})
}