1
0
mirror of https://github.com/mattermost/focalboard.git synced 2025-01-08 15:06:08 +02:00
focalboard/server/utils/callbackqueue.go
Rajat Dabade c8e729b6fe
[Refactor]: updated dependency for focalboard server (#5009)
* refactor: updated dependency for focalboard server

* chore: more dependency fixes

* refactor: removed the unless code

* refactor: added ctx for login and removed unnessary code

* refactor: bump up go version

* refactor: removed the commented code

* chore: upgraded golinter version

* fix: linter issue

* refactor: removed feature flg fix golinter

* refactor: removed feature flag from code

* revert: statistic and it's function

* refactor: removed ProductLimit related code

* refactor: removed isWithinViewsLimit implementation

* refactor: moved function GetUsedCardsCount to statistics.go from cloud.go

* refactor: removed insight code board

* refactor: removed limit dialog

* refactor: updated dependencies for linux

* chore: golinter fix

* chore: updated helper test function to use newLogger

* fix: go test

* refactor: db ping attempts from config

* revert: feature in action

* revert: feature flag in action

* revert: boardsEditor setting

---------

Co-authored-by: Rajat Dabade <rajat@Rajats-MacBook-Pro.local>
2024-06-07 23:30:08 +05:30

136 lines
3.0 KiB
Go

package utils
import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error
// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
name string
poolSize int
queue chan CallbackFunc
done chan struct{}
alive chan int
idone uint32
logger mlog.LoggerIFace
}
// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
cn := &CallbackQueue{
name: name,
poolSize: poolSize,
queue: make(chan CallbackFunc, queueSize),
done: make(chan struct{}),
alive: make(chan int, poolSize),
logger: logger,
}
for i := 0; i < poolSize; i++ {
go cn.loop(i)
}
return cn
}
// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
// already shutdown
return true
}
// signal threads to exit
close(cn.done)
// wait for the threads to exit or timeout
count := 0
for count < cn.poolSize {
select {
case <-cn.alive:
count++
case <-context.Done():
return false
}
}
// try to drain any remaining callbacks
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-context.Done():
return false
default:
return true
}
}
}
// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
if atomic.LoadUint32(&cn.idone) != 0 {
cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
return
}
select {
case cn.queue <- f:
default:
start := time.Now()
cn.queue <- f
dur := time.Since(start)
cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
}
}
func (cn *CallbackQueue) loop(id int) {
defer func() {
cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
cn.alive <- id
}()
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-cn.done:
return
}
}
}
func (cn *CallbackQueue) exec(f CallbackFunc) {
// don't let a panic in the callback exit the thread.
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
cn.logger.Error("CallbackQueue callback panic",
mlog.String("name", cn.name),
mlog.Any("panic", r),
mlog.String("stack", string(stack)),
)
}
}()
if err := f(); err != nil {
cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
}
}