package utils

import (
	"context"
	"runtime/debug"
	"sync/atomic"
	"time"

	"github.com/mattermost/mattermost-server/v6/shared/mlog"
)

// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error

// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
	name     string
	poolSize int

	queue chan CallbackFunc
	done  chan struct{}
	alive chan int

	idone uint32

	logger mlog.LoggerIFace
}

// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
	cn := &CallbackQueue{
		name:     name,
		poolSize: poolSize,
		queue:    make(chan CallbackFunc, queueSize),
		done:     make(chan struct{}),
		alive:    make(chan int, poolSize),
		logger:   logger,
	}

	for i := 0; i < poolSize; i++ {
		go cn.loop(i)
	}

	return cn
}

// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
	if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
		// already shutdown
		return true
	}

	// signal threads to exit
	close(cn.done)

	// wait for the threads to exit or timeout
	count := 0
	for count < cn.poolSize {
		select {
		case <-cn.alive:
			count++
		case <-context.Done():
			return false
		}
	}

	// try to drain any remaining callbacks
	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-context.Done():
			return false
		default:
			return true
		}
	}
}

// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
	if atomic.LoadUint32(&cn.idone) != 0 {
		cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
		return
	}

	select {
	case cn.queue <- f:
	default:
		start := time.Now()
		cn.queue <- f
		dur := time.Since(start)
		cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
	}
}

func (cn *CallbackQueue) loop(id int) {
	defer func() {
		cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
		cn.alive <- id
	}()

	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-cn.done:
			return
		}
	}
}

func (cn *CallbackQueue) exec(f CallbackFunc) {
	// don't let a panic in the callback exit the thread.
	defer func() {
		if r := recover(); r != nil {
			stack := debug.Stack()
			cn.logger.Error("CallbackQueue callback panic",
				mlog.String("name", cn.name),
				mlog.Any("panic", r),
				mlog.String("stack", string(stack)),
			)
		}
	}()

	if err := f(); err != nil {
		cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
	}
}