1
0
mirror of https://github.com/FFmpeg/FFmpeg.git synced 2025-10-06 05:47:18 +02:00

fftools/ffmpeg_sched: forward demuxer choke status to dst queues

Cut off a choked demuxer's output codec/filter queues, effectively preventing
them from processing packets while the demuxer is choked. Avoids downstream
nodes from piling up extra input that a demuxer shouldn't currently be
sending.

The main benefit of this is to avoid queuing up excess packets that don't want
to be decoded yet, reducing memory consumption for idle inputs by preventing
them from being read earlier than needed.
This commit is contained in:
Niklas Haas
2025-09-04 18:22:27 +02:00
parent 59a847a237
commit 9d0b88feb1

View File

@@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
}
}
static void choke_demux(const Scheduler *sch, int demux_id, int choked)
{
av_assert1(demux_id < sch->nb_demux);
SchDemux *demux = &sch->demux[demux_id];
for (int i = 0; i < demux->nb_streams; i++) {
SchedulerNode *dst = demux->streams[i].dst;
SchFilterGraph *fg;
switch (dst->type) {
case SCH_NODE_TYPE_DEC:
tq_choke(sch->dec[dst->idx].queue, choked);
break;
case SCH_NODE_TYPE_ENC:
tq_choke(sch->enc[dst->idx].queue, choked);
break;
case SCH_NODE_TYPE_MUX:
break;
case SCH_NODE_TYPE_FILTER_IN:
fg = &sch->filters[dst->idx];
if (fg->nb_inputs == 1)
tq_choke(fg->queue, choked);
break;
default:
av_unreachable("Invalid destination node type?");
break;
}
}
}
static void schedule_update_locked(Scheduler *sch)
{
int64_t dts;
@@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch)
}
}
for (unsigned type = 0; type < 2; type++)
for (unsigned type = 0; type < 2; type++) {
for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
if (w->choked_prev != w->choked_next)
if (w->choked_prev != w->choked_next) {
waiter_set(w, w->choked_next);
if (!type)
choke_demux(sch, i, w->choked_next);
}
}
}
}
@@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
waiter_set(w, 1);
if (type)
choke_demux(sch, i, 0); // unfreeze to allow draining
}
for (unsigned i = 0; i < sch->nb_demux; i++) {