mirror of
https://github.com/FFmpeg/FFmpeg.git
synced 2025-01-29 22:00:58 +02:00
fftools/ffmpeg_sched: allow encoders to send to multiple destinations
Will become useful in following commits.
This commit is contained in:
parent
3febc84e5e
commit
6ccbff2631
@ -104,7 +104,9 @@ typedef struct SchEnc {
|
||||
const AVClass *class;
|
||||
|
||||
SchedulerNode src;
|
||||
SchedulerNode dst;
|
||||
SchedulerNode *dst;
|
||||
uint8_t *dst_finished;
|
||||
unsigned nb_dst;
|
||||
|
||||
// [0] - index of the sync queue in Scheduler.sq_enc,
|
||||
// [1] - index of this encoder in the sq
|
||||
@ -134,7 +136,9 @@ typedef struct SchEnc {
|
||||
ThreadQueue *queue;
|
||||
// tq_send() to queue returned EOF
|
||||
int in_finished;
|
||||
int out_finished;
|
||||
|
||||
// temporary storage used by sch_enc_send()
|
||||
AVPacket *send_pkt;
|
||||
} SchEnc;
|
||||
|
||||
typedef struct SchDemuxStream {
|
||||
@ -569,6 +573,11 @@ void sch_free(Scheduler **psch)
|
||||
SchEnc *enc = &sch->enc[i];
|
||||
|
||||
tq_free(&enc->queue);
|
||||
|
||||
av_packet_free(&enc->send_pkt);
|
||||
|
||||
av_freep(&enc->dst);
|
||||
av_freep(&enc->dst_finished);
|
||||
}
|
||||
av_freep(&sch->enc);
|
||||
|
||||
@ -819,6 +828,10 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
|
||||
|
||||
task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
|
||||
|
||||
enc->send_pkt = av_packet_alloc();
|
||||
if (!enc->send_pkt)
|
||||
return AVERROR(ENOMEM);
|
||||
|
||||
ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
@ -1048,9 +1061,14 @@ int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
|
||||
enc = &sch->enc[src.idx];
|
||||
ms = &sch->mux[dst.idx].streams[dst.idx_stream];
|
||||
|
||||
av_assert0(!enc->dst.type && !ms->src.type);
|
||||
enc->dst = dst;
|
||||
ms->src = src;
|
||||
av_assert0(!ms->src.type);
|
||||
|
||||
ret = GROW_ARRAY(enc->dst, enc->nb_dst);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
enc->dst[enc->nb_dst - 1] = dst;
|
||||
ms->src = src;
|
||||
|
||||
break;
|
||||
}
|
||||
@ -1339,12 +1357,16 @@ int sch_start(Scheduler *sch)
|
||||
"Encoder not connected to a source\n");
|
||||
return AVERROR(EINVAL);
|
||||
}
|
||||
if (!enc->dst.type) {
|
||||
if (!enc->nb_dst) {
|
||||
av_log(enc, AV_LOG_ERROR,
|
||||
"Encoder not connected to a sink\n");
|
||||
"Encoder not connected to any sink\n");
|
||||
return AVERROR(EINVAL);
|
||||
}
|
||||
|
||||
enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
|
||||
if (!enc->dst_finished)
|
||||
return AVERROR(ENOMEM);
|
||||
|
||||
ret = task_start(&enc->task);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
@ -1518,15 +1540,21 @@ static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
|
||||
// TODO: consider a cleaner way of passing this information through
|
||||
// the pipeline
|
||||
if (!frame) {
|
||||
SchMux *mux = &sch->mux[enc->dst.idx];
|
||||
SchMuxStream *ms = &mux->streams[enc->dst.idx_stream];
|
||||
for (unsigned i = 0; i < enc->nb_dst; i++) {
|
||||
SchMux *mux;
|
||||
SchMuxStream *ms;
|
||||
|
||||
pthread_mutex_lock(&sch->schedule_lock);
|
||||
|
||||
ms->source_finished = 1;
|
||||
schedule_update_locked(sch);
|
||||
mux = &sch->mux[enc->dst[i].idx];
|
||||
ms = &mux->streams[enc->dst[i].idx_stream];
|
||||
|
||||
pthread_mutex_unlock(&sch->schedule_lock);
|
||||
pthread_mutex_lock(&sch->schedule_lock);
|
||||
|
||||
ms->source_finished = 1;
|
||||
schedule_update_locked(sch);
|
||||
|
||||
pthread_mutex_unlock(&sch->schedule_lock);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&sq->lock);
|
||||
@ -2080,20 +2108,64 @@ int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
|
||||
uint8_t *dst_finished, AVPacket *pkt)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (*dst_finished)
|
||||
return AVERROR_EOF;
|
||||
|
||||
if (!pkt)
|
||||
goto finish;
|
||||
|
||||
ret = send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt);
|
||||
if (ret == AVERROR_EOF)
|
||||
goto finish;
|
||||
|
||||
return ret;
|
||||
|
||||
finish:
|
||||
send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
|
||||
|
||||
*dst_finished = 1;
|
||||
|
||||
return AVERROR_EOF;
|
||||
}
|
||||
|
||||
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
|
||||
{
|
||||
SchEnc *enc;
|
||||
int ret;
|
||||
unsigned nb_done = 0;
|
||||
|
||||
av_assert0(enc_idx < sch->nb_enc);
|
||||
enc = &sch->enc[enc_idx];
|
||||
|
||||
if (enc->out_finished)
|
||||
return pkt ? AVERROR_EOF : 0;
|
||||
for (unsigned i = 0; i < enc->nb_dst; i++) {
|
||||
uint8_t *finished = &enc->dst_finished[i];
|
||||
AVPacket *to_send = pkt;
|
||||
|
||||
ret = send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, pkt);
|
||||
if (ret < 0)
|
||||
enc->out_finished = 1;
|
||||
// sending a packet consumes it, so make a temporary reference if needed
|
||||
if (i < enc->nb_dst - 1) {
|
||||
to_send = enc->send_pkt;
|
||||
|
||||
ret = av_packet_ref(to_send, pkt);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
|
||||
if (ret < 0) {
|
||||
av_packet_unref(to_send);
|
||||
if (ret == AVERROR_EOF) {
|
||||
nb_done++;
|
||||
ret = 0;
|
||||
continue;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -2101,10 +2173,17 @@ int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
|
||||
static int enc_done(Scheduler *sch, unsigned enc_idx)
|
||||
{
|
||||
SchEnc *enc = &sch->enc[enc_idx];
|
||||
int ret = 0;
|
||||
|
||||
tq_receive_finish(enc->queue, 0);
|
||||
|
||||
return send_to_mux(sch, &sch->mux[enc->dst.idx], enc->dst.idx_stream, NULL);
|
||||
for (unsigned i = 0; i < enc->nb_dst; i++) {
|
||||
int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
|
||||
if (err < 0 && err != AVERROR_EOF)
|
||||
ret = err_merge(ret, err);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
|
||||
|
Loading…
x
Reference in New Issue
Block a user