From 9d0b88feb17dabfebcb10b801045c1285fa5e4bc Mon Sep 17 00:00:00 2001 From: Niklas Haas Date: Thu, 4 Sep 2025 18:22:27 +0200 Subject: [PATCH] fftools/ffmpeg_sched: forward demuxer choke status to dst queues Cut off a choked demuxer's output codec/filter queues, effectively preventing them from processing packets while the demuxer is choked. Avoids downstream nodes from piling up extra input that a demuxer shouldn't currently be sending. The main benefit of this is to avoid queuing up excess packets that don't want to be decoded yet, reducing memory consumption for idle inputs by preventing them from being read earlier than needed. --- fftools/ffmpeg_sched.c | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 039cd1c9aa..589f5360f2 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) } } +static void choke_demux(const Scheduler *sch, int demux_id, int choked) +{ + av_assert1(demux_id < sch->nb_demux); + SchDemux *demux = &sch->demux[demux_id]; + + for (int i = 0; i < demux->nb_streams; i++) { + SchedulerNode *dst = demux->streams[i].dst; + SchFilterGraph *fg; + + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + tq_choke(sch->dec[dst->idx].queue, choked); + break; + case SCH_NODE_TYPE_ENC: + tq_choke(sch->enc[dst->idx].queue, choked); + break; + case SCH_NODE_TYPE_MUX: + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->nb_inputs == 1) + tq_choke(fg->queue, choked); + break; + default: + av_unreachable("Invalid destination node type?"); + break; + } + } +} + static void schedule_update_locked(Scheduler *sch) { int64_t dts; @@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch) } } - - for (unsigned type = 0; type < 2; type++) + for (unsigned type = 0; type < 2; type++) { for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (w->choked_prev != w->choked_next) + if (w->choked_prev != w->choked_next) { waiter_set(w, w->choked_next); + if (!type) + choke_demux(sch, i, w->choked_next); + } } + } } @@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; waiter_set(w, 1); + if (type) + choke_demux(sch, i, 0); // unfreeze to allow draining } for (unsigned i = 0; i < sch->nb_demux; i++) {