diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 039cd1c9aa..589f5360f2 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src) } } +static void choke_demux(const Scheduler *sch, int demux_id, int choked) +{ + av_assert1(demux_id < sch->nb_demux); + SchDemux *demux = &sch->demux[demux_id]; + + for (int i = 0; i < demux->nb_streams; i++) { + SchedulerNode *dst = demux->streams[i].dst; + SchFilterGraph *fg; + + switch (dst->type) { + case SCH_NODE_TYPE_DEC: + tq_choke(sch->dec[dst->idx].queue, choked); + break; + case SCH_NODE_TYPE_ENC: + tq_choke(sch->enc[dst->idx].queue, choked); + break; + case SCH_NODE_TYPE_MUX: + break; + case SCH_NODE_TYPE_FILTER_IN: + fg = &sch->filters[dst->idx]; + if (fg->nb_inputs == 1) + tq_choke(fg->queue, choked); + break; + default: + av_unreachable("Invalid destination node type?"); + break; + } + } +} + static void schedule_update_locked(Scheduler *sch) { int64_t dts; @@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch) } } - - for (unsigned type = 0; type < 2; type++) + for (unsigned type = 0; type < 2; type++) { for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) { SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter; - if (w->choked_prev != w->choked_next) + if (w->choked_prev != w->choked_next) { waiter_set(w, w->choked_next); + if (!type) + choke_demux(sch, i, w->choked_next); + } } + } } @@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) { SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter; waiter_set(w, 1); + if (type) + choke_demux(sch, i, 0); // unfreeze to allow draining } for (unsigned i = 0; i < sch->nb_demux; i++) {