You've already forked imgproxy
mirror of
https://github.com/imgproxy/imgproxy.git
synced 2026-06-15 22:47:11 +02:00
82 lines
2.1 KiB
Go
82 lines
2.1 KiB
Go
package asyncbuffer
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
type condCh = chan struct{}
|
|
|
|
// Cond signals that an event has occurred to multiple waiters.
|
|
// Uses a cursor to detect if Tick() occurred between checking a condition and waiting.
|
|
type Cond struct {
|
|
mu sync.RWMutex
|
|
ch condCh
|
|
cursor atomic.Uint64 // Incremented on each Tick
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
// NewCond creates a new Cond instance with an initialized channel.
|
|
func NewCond() *Cond {
|
|
return &Cond{
|
|
ch: make(condCh),
|
|
}
|
|
}
|
|
|
|
// Tick signals that an event has occurred by closing the channel and incrementing cursor.
|
|
func (t *Cond) Tick() {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.ch != nil {
|
|
t.cursor.Add(1) // Increment FIRST so waiters see new cursor when they wake
|
|
close(t.ch) // Then close channel to wake waiters
|
|
t.ch = make(condCh) // Create new channel for next Tick
|
|
}
|
|
}
|
|
|
|
// Cursor returns the current cursor value.
|
|
// Capture this before checking your condition, then pass it to Wait().
|
|
func (t *Cond) Cursor() uint64 {
|
|
return t.cursor.Load()
|
|
}
|
|
|
|
// Wait waits for a Tick() to occur after the given cursor value.
|
|
// Returns the new cursor value when awakened.
|
|
// If the cursor has already advanced beyond the given value, returns immediately with the current cursor.
|
|
func (t *Cond) Wait(cursor uint64) uint64 {
|
|
t.mu.RLock()
|
|
ch := t.ch
|
|
current := t.cursor.Load()
|
|
t.mu.RUnlock()
|
|
|
|
// If cursor already advanced, return immediately
|
|
if current != cursor || ch == nil {
|
|
return current
|
|
}
|
|
|
|
// Wait for the channel to close (Tick happened)
|
|
// If Tick happens after t.mu.RUnlock() and here, we will just wait on a
|
|
// closed channel and return updated cursor value.
|
|
<-ch
|
|
|
|
// NOTE: Another Tick() could happen here before we read cursor.
|
|
// This is acceptable - we return the current cursor value, which may be
|
|
// newer than the Tick() that woke us. The caller will use this cursor
|
|
// for the next wait cycle and re-check their condition.
|
|
return t.cursor.Load()
|
|
}
|
|
|
|
// Close closes the ticker channel and prevents further ticks.
|
|
func (t *Cond) Close() {
|
|
t.closeOnce.Do(func() {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.ch != nil {
|
|
close(t.ch)
|
|
t.ch = nil
|
|
}
|
|
})
|
|
}
|