1
0
mirror of https://github.com/FFmpeg/FFmpeg.git synced 2024-12-28 20:53:54 +02:00

fftools/ffmpeg: move threading fields from InputFile to Demuxer

They are private to the demuxer and do not need to be visible outside of
it.
This commit is contained in:
Anton Khirnov 2022-10-17 12:49:51 +02:00
parent c20977c4e0
commit 5bc1f177d3
2 changed files with 27 additions and 27 deletions

View File

@ -462,11 +462,6 @@ typedef struct InputFile {
float readrate; float readrate;
int accurate_seek; int accurate_seek;
AVThreadMessageQueue *in_thread_queue;
pthread_t thread; /* thread reading from this file */
int non_blocking; /* reading packets from the thread should not block */
int thread_queue_size; /* maximum number of queued packets */
/* when looping the input file, this queue is used by decoders to report /* when looping the input file, this queue is used by decoders to report
* the last frame duration back to the demuxer thread */ * the last frame duration back to the demuxer thread */
AVThreadMessageQueue *audio_duration_queue; AVThreadMessageQueue *audio_duration_queue;

View File

@ -57,6 +57,11 @@ typedef struct Demuxer {
/* number of times input stream should be looped */ /* number of times input stream should be looped */
int loop; int loop;
AVThreadMessageQueue *in_thread_queue;
int thread_queue_size;
pthread_t thread;
int non_blocking;
} Demuxer; } Demuxer;
typedef struct DemuxMsg { typedef struct DemuxMsg {
@ -225,7 +230,7 @@ static void *input_thread(void *arg)
Demuxer *d = arg; Demuxer *d = arg;
InputFile *f = &d->f; InputFile *f = &d->f;
AVPacket *pkt; AVPacket *pkt;
unsigned flags = f->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0;
int ret = 0; int ret = 0;
pkt = av_packet_alloc(); pkt = av_packet_alloc();
@ -249,7 +254,7 @@ static void *input_thread(void *arg)
if (d->loop) { if (d->loop) {
/* signal looping to the consumer thread */ /* signal looping to the consumer thread */
msg.looping = 1; msg.looping = 1;
ret = av_thread_message_queue_send(f->in_thread_queue, &msg, 0); ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0);
if (ret >= 0) if (ret >= 0)
ret = seek_to_start(d); ret = seek_to_start(d);
if (ret >= 0) if (ret >= 0)
@ -294,14 +299,14 @@ static void *input_thread(void *arg)
break; break;
} }
av_packet_move_ref(msg.pkt, pkt); av_packet_move_ref(msg.pkt, pkt);
ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags); ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
if (flags && ret == AVERROR(EAGAIN)) { if (flags && ret == AVERROR(EAGAIN)) {
flags = 0; flags = 0;
ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags); ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
av_log(f->ctx, AV_LOG_WARNING, av_log(f->ctx, AV_LOG_WARNING,
"Thread message queue blocking; consider raising the " "Thread message queue blocking; consider raising the "
"thread_queue_size option (current value: %d)\n", "thread_queue_size option (current value: %d)\n",
f->thread_queue_size); d->thread_queue_size);
} }
if (ret < 0) { if (ret < 0) {
if (ret != AVERROR_EOF) if (ret != AVERROR_EOF)
@ -315,7 +320,7 @@ static void *input_thread(void *arg)
finish: finish:
av_assert0(ret < 0); av_assert0(ret < 0);
av_thread_message_queue_set_err_recv(f->in_thread_queue, ret); av_thread_message_queue_set_err_recv(d->in_thread_queue, ret);
av_packet_free(&pkt); av_packet_free(&pkt);
@ -327,14 +332,14 @@ static void thread_stop(Demuxer *d)
InputFile *f = &d->f; InputFile *f = &d->f;
DemuxMsg msg; DemuxMsg msg;
if (!f->in_thread_queue) if (!d->in_thread_queue)
return; return;
av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF); av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF);
while (av_thread_message_queue_recv(f->in_thread_queue, &msg, 0) >= 0) while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0)
av_packet_free(&msg.pkt); av_packet_free(&msg.pkt);
pthread_join(f->thread, NULL); pthread_join(d->thread, NULL);
av_thread_message_queue_free(&f->in_thread_queue); av_thread_message_queue_free(&d->in_thread_queue);
av_thread_message_queue_free(&f->audio_duration_queue); av_thread_message_queue_free(&f->audio_duration_queue);
} }
@ -343,14 +348,14 @@ static int thread_start(Demuxer *d)
int ret; int ret;
InputFile *f = &d->f; InputFile *f = &d->f;
if (f->thread_queue_size <= 0) if (d->thread_queue_size <= 0)
f->thread_queue_size = (nb_input_files > 1 ? 8 : 1); d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
if (f->ctx->pb ? !f->ctx->pb->seekable : if (f->ctx->pb ? !f->ctx->pb->seekable :
strcmp(f->ctx->iformat->name, "lavfi")) strcmp(f->ctx->iformat->name, "lavfi"))
f->non_blocking = 1; d->non_blocking = 1;
ret = av_thread_message_queue_alloc(&f->in_thread_queue, ret = av_thread_message_queue_alloc(&d->in_thread_queue,
f->thread_queue_size, sizeof(DemuxMsg)); d->thread_queue_size, sizeof(DemuxMsg));
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -372,7 +377,7 @@ static int thread_start(Demuxer *d)
} }
} }
if ((ret = pthread_create(&f->thread, NULL, input_thread, d))) { if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
ret = AVERROR(ret); ret = AVERROR(ret);
goto fail; goto fail;
@ -380,7 +385,7 @@ static int thread_start(Demuxer *d)
return 0; return 0;
fail: fail:
av_thread_message_queue_free(&f->in_thread_queue); av_thread_message_queue_free(&d->in_thread_queue);
return ret; return ret;
} }
@ -391,7 +396,7 @@ int ifile_get_packet(InputFile *f, AVPacket **pkt)
DemuxMsg msg; DemuxMsg msg;
int ret; int ret;
if (!f->in_thread_queue) { if (!d->in_thread_queue) {
ret = thread_start(d); ret = thread_start(d);
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -416,8 +421,8 @@ int ifile_get_packet(InputFile *f, AVPacket **pkt)
} }
} }
ret = av_thread_message_queue_recv(f->in_thread_queue, &msg, ret = av_thread_message_queue_recv(d->in_thread_queue, &msg,
f->non_blocking ? d->non_blocking ?
AV_THREAD_MESSAGE_NONBLOCK : 0); AV_THREAD_MESSAGE_NONBLOCK : 0);
if (ret < 0) if (ret < 0)
return ret; return ret;
@ -1019,7 +1024,7 @@ int ifile_open(OptionsContext *o, const char *filename)
f->rate_emu = 0; f->rate_emu = 0;
} }
f->thread_queue_size = o->thread_queue_size; d->thread_queue_size = o->thread_queue_size;
/* check if all codec options have been used */ /* check if all codec options have been used */
unused_opts = strip_specifiers(o->g->codec_opts); unused_opts = strip_specifiers(o->g->codec_opts);