1
0
mirror of https://github.com/FFmpeg/FFmpeg.git synced 2025-10-06 05:47:18 +02:00

fftools/thread_queue: allow choking thread queues directly

Currently, when a demuxer thread is choked, it will avoid queuing more
packets, but any packets already present on the thread queue will still be
processed.

This can be quite wasteful if the choke is due to e.g. decoder not being
needed yet, such as in a filter graph involving concatenation-style filters.
Adding the ability to propagate the choke status to the thread queue directly
allows downstream decoders and filter graphs to avoid unnecessary work and
buffering.

Reduces the effective latency between scheduler updates and changes in the
thread workfload.
This commit is contained in:
Niklas Haas
2025-09-04 18:09:31 +02:00
parent 5f4cbb5617
commit 59a847a237
2 changed files with 26 additions and 0 deletions

View File

@@ -38,6 +38,7 @@ enum {
};
struct ThreadQueue {
int choked;
int *finished;
unsigned int nb_streams;
@@ -157,6 +158,9 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx,
{
unsigned int nb_finished = 0;
if (tq->choked)
return AVERROR(EAGAIN);
while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
unsigned idx;
int ret;
@@ -230,6 +234,7 @@ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
* next time the consumer thread tries to read this stream it will get
* an EOF and recv-finished flag will be set */
tq->finished[stream_idx] |= FINISHED_SEND;
tq->choked = 0;
pthread_cond_broadcast(&tq->cond);
pthread_mutex_unlock(&tq->lock);
@@ -249,3 +254,15 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
pthread_mutex_unlock(&tq->lock);
}
void tq_choke(ThreadQueue *tq, int choked)
{
pthread_mutex_lock(&tq->lock);
int prev_choked = tq->choked;
tq->choked = choked;
if (choked != prev_choked)
pthread_cond_broadcast(&tq->cond);
pthread_mutex_unlock(&tq->lock);
}

View File

@@ -58,6 +58,15 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data);
*/
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx);
/**
* Prevent further reads from the thread queue until it is unchoked. Threads
* attempting to read from the queue will block, similar to when the queue is
* empty.
*
* @param choked 1 to choke, 0 to unchoke
*/
void tq_choke(ThreadQueue *tq, int choked);
/**
* Read the next item from the queue.
*