diff --git a/Changelog b/Changelog index 65aa2f0232..f42e5e3cdc 100644 --- a/Changelog +++ b/Changelog @@ -54,6 +54,7 @@ version : - V4L2 output device - 3D LUT filter (lut3d) - SMPTE 302M audio encoder +- support for slice multithreading in libavfilter version 1.2: diff --git a/cmdutils.c b/cmdutils.c index 542634924e..abe0fdf6c3 100644 --- a/cmdutils.c +++ b/cmdutils.c @@ -1660,6 +1660,10 @@ static void show_help_filter(const char *name) printf("Filter %s\n", f->name); if (f->description) printf(" %s\n", f->description); + + if (f->flags & AVFILTER_FLAG_SLICE_THREADS) + printf(" slice threading supported\n"); + printf(" Inputs:\n"); count = avfilter_pad_count(f->inputs); for (i = 0; i < count; i++) { diff --git a/doc/APIchanges b/doc/APIchanges index 1143d56d41..33f1a1b777 100644 --- a/doc/APIchanges +++ b/doc/APIchanges @@ -16,6 +16,13 @@ libavutil: 2012-10-22 API changes, most recent first: +2013-05-24 - xxxxxxx - lavfi 3.70.100 - avfilter.h + Add support for slice multithreading to lavfi. Filters supporting threading + are marked with AVFILTER_FLAG_SLICE_THREADS. + New fields AVFilterContext.thread_type, AVFilterGraph.thread_type and + AVFilterGraph.nb_threads (accessible directly or through AVOptions) may be + used to configure multithreading. + 2013-05-24 - xxxxxxx - lavu 52.34.100 - cpu.h Add av_cpu_count() function for getting the number of logical CPUs. diff --git a/libavfilter/Makefile b/libavfilter/Makefile index 6cc2930f84..fa601b57fe 100644 --- a/libavfilter/Makefile +++ b/libavfilter/Makefile @@ -240,6 +240,8 @@ OBJS-$(CONFIG_MOVIE_FILTER) += src_movie.o SKIPHEADERS-$(CONFIG_LIBVIDSTAB) += vidstabutils.h SKIPHEADERS-$(CONFIG_OPENCL) += opencl_internal.h deshake_opencl_kernel.h unsharp_opencl_kernel.h +OBJS-$(HAVE_THREADS) += pthread.o + TOOLS = graph2dot TESTPROGS = drawutils filtfmts formats diff --git a/libavfilter/avfilter.c b/libavfilter/avfilter.c index 59fc0fa7c4..1ea6d8dbfe 100644 --- a/libavfilter/avfilter.c +++ b/libavfilter/avfilter.c @@ -538,9 +538,12 @@ static const AVClass *filter_child_class_next(const AVClass *prev) #define OFFSET(x) offsetof(AVFilterContext, x) #define FLAGS AV_OPT_FLAG_FILTERING_PARAM -static const AVOption filters_common_options[] = { +static const AVOption options[] = { + { "thread_type", "Allowed thread types", OFFSET(thread_type), AV_OPT_TYPE_FLAGS, + { .i64 = AVFILTER_THREAD_SLICE }, 0, INT_MAX, FLAGS, "thread_type" }, + { "slice", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = AVFILTER_THREAD_SLICE }, .unit = "thread_type" }, { "enable", "set enable expression", OFFSET(enable_str), AV_OPT_TYPE_STRING, {.str=NULL}, .flags = FLAGS }, - { NULL } + { NULL }, }; static const AVClass avfilter_class = { @@ -549,10 +552,23 @@ static const AVClass avfilter_class = { .version = LIBAVUTIL_VERSION_INT, .category = AV_CLASS_CATEGORY_FILTER, .child_next = filter_child_next, - .option = filters_common_options, .child_class_next = filter_child_class_next, + .option = options, }; +static int default_execute(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs) +{ + int i; + + for (i = 0; i < nb_jobs; i++) { + int r = func(ctx, arg, i, nb_jobs); + if (ret) + ret[i] = r; + } + return 0; +} + AVFilterContext *ff_filter_alloc(const AVFilter *filter, const char *inst_name) { AVFilterContext *ret; @@ -573,11 +589,17 @@ AVFilterContext *ff_filter_alloc(const AVFilter *filter, const char *inst_name) goto err; } + av_opt_set_defaults(ret); if (filter->priv_class) { *(const AVClass**)ret->priv = filter->priv_class; av_opt_set_defaults(ret->priv); } + ret->internal = av_mallocz(sizeof(*ret->internal)); + if (!ret->internal) + goto err; + ret->internal->execute = default_execute; + ret->nb_inputs = avfilter_pad_count(filter->inputs); if (ret->nb_inputs ) { ret->input_pads = av_malloc(sizeof(AVFilterPad) * ret->nb_inputs); @@ -614,6 +636,7 @@ err: av_freep(&ret->output_pads); ret->nb_outputs = 0; av_freep(&ret->priv); + av_freep(&ret->internal); av_free(ret); return NULL; } @@ -681,6 +704,7 @@ void avfilter_free(AVFilterContext *filter) av_expr_free(filter->enable); filter->enable = NULL; av_freep(&filter->var_values); + av_freep(&filter->internal); av_free(filter); } @@ -772,6 +796,21 @@ int avfilter_init_dict(AVFilterContext *ctx, AVDictionary **options) { int ret = 0; + ret = av_opt_set_dict(ctx, options); + if (ret < 0) { + av_log(ctx, AV_LOG_ERROR, "Error applying generic filter options.\n"); + return ret; + } + + if (ctx->filter->flags & AVFILTER_FLAG_SLICE_THREADS && + ctx->thread_type & ctx->graph->thread_type & AVFILTER_THREAD_SLICE && + ctx->graph->internal->thread_execute) { + ctx->thread_type = AVFILTER_THREAD_SLICE; + ctx->internal->execute = ctx->graph->internal->thread_execute; + } else { + ctx->thread_type = 0; + } + if (ctx->filter->priv_class) { ret = av_opt_set_dict(ctx->priv, options); if (ret < 0) { diff --git a/libavfilter/avfilter.h b/libavfilter/avfilter.h index 741ec350b8..54a28f0b04 100644 --- a/libavfilter/avfilter.h +++ b/libavfilter/avfilter.h @@ -428,6 +428,11 @@ enum AVMediaType avfilter_pad_get_type(const AVFilterPad *pads, int pad_idx); * the options supplied to it. */ #define AVFILTER_FLAG_DYNAMIC_OUTPUTS (1 << 1) +/** + * The filter supports multithreading by splitting frames into multiple parts + * and processing them concurrently. + */ +#define AVFILTER_FLAG_SLICE_THREADS (1 << 2) /** * Some filters support a generic "enable" expression option that can be used * to enable or disable a filter in the timeline. Filters supporting this @@ -542,6 +547,13 @@ typedef struct AVFilter { int (*init_opaque)(AVFilterContext *ctx, void *opaque); } AVFilter; +/** + * Process multiple parts of the frame concurrently. + */ +#define AVFILTER_THREAD_SLICE (1 << 0) + +typedef struct AVFilterInternal AVFilterInternal; + /** An instance of a filter */ struct AVFilterContext { const AVClass *av_class; ///< needed for av_log() and filters common options @@ -568,6 +580,29 @@ struct AVFilterContext { struct AVFilterGraph *graph; ///< filtergraph this filter belongs to + /** + * Type of multithreading being allowed/used. A combination of + * AVFILTER_THREAD_* flags. + * + * May be set by the caller before initializing the filter to forbid some + * or all kinds of multithreading for this filter. The default is allowing + * everything. + * + * When the filter is initialized, this field is combined using bit AND with + * AVFilterGraph.thread_type to get the final mask used for determining + * allowed threading types. I.e. a threading type needs to be set in both + * to be allowed. + * + * After the filter is initialzed, libavfilter sets this field to the + * threading type that is actually used (0 for no multithreading). + */ + int thread_type; + + /** + * An opaque struct for libavfilter internal use. + */ + AVFilterInternal *internal; + struct AVFilterCommand *command_queue; char *enable_str; ///< enable expression string @@ -1020,6 +1055,8 @@ int avfilter_copy_buf_props(AVFrame *dst, const AVFilterBufferRef *src); */ const AVClass *avfilter_get_class(void); +typedef struct AVFilterGraphInternal AVFilterGraphInternal; + typedef struct AVFilterGraph { const AVClass *av_class; #if FF_API_FOO_COUNT @@ -1036,6 +1073,33 @@ typedef struct AVFilterGraph { #if FF_API_FOO_COUNT unsigned nb_filters; #endif + + /** + * Type of multithreading allowed for filters in this graph. A combination + * of AVFILTER_THREAD_* flags. + * + * May be set by the caller at any point, the setting will apply to all + * filters initialized after that. The default is allowing everything. + * + * When a filter in this graph is initialized, this field is combined using + * bit AND with AVFilterContext.thread_type to get the final mask used for + * determining allowed threading types. I.e. a threading type needs to be + * set in both to be allowed. + */ + int thread_type; + + /** + * Maximum number of threads used by filters in this graph. May be set by + * the caller before adding any filters to the filtergraph. Zero (the + * default) means that the number of threads is determined automatically. + */ + int nb_threads; + + /** + * Opaque object for libavfilter internal use. + */ + AVFilterGraphInternal *internal; + char *aresample_swr_opts; ///< swr options to use for the auto-inserted aresample filters, Access ONLY through AVOptions /** diff --git a/libavfilter/avfiltergraph.c b/libavfilter/avfiltergraph.c index a9b91c3895..8c400028d8 100644 --- a/libavfilter/avfiltergraph.c +++ b/libavfilter/avfiltergraph.c @@ -20,6 +20,8 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include "config.h" + #include #include "libavutil/avassert.h" @@ -29,33 +31,63 @@ #include "libavutil/opt.h" #include "libavutil/pixdesc.h" #include "libavcodec/avcodec.h" // avcodec_find_best_pix_fmt_of_2() + #include "avfilter.h" #include "formats.h" #include "internal.h" +#include "thread.h" -#define OFFSET(x) offsetof(AVFilterGraph,x) - -static const AVOption options[]={ -{"scale_sws_opts" , "default scale filter options" , OFFSET(scale_sws_opts) , AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, 0 }, -{"aresample_swr_opts" , "default aresample filter options" , OFFSET(aresample_swr_opts) , AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, 0 }, -{0} +#define OFFSET(x) offsetof(AVFilterGraph, x) +#define FLAGS AV_OPT_FLAG_VIDEO_PARAM|AV_OPT_FLAG_FILTERING_PARAM +static const AVOption filtergraph_options[] = { + { "thread_type", "Allowed thread types", OFFSET(thread_type), AV_OPT_TYPE_FLAGS, + { .i64 = AVFILTER_THREAD_SLICE }, 0, INT_MAX, FLAGS, "thread_type" }, + { "slice", NULL, 0, AV_OPT_TYPE_CONST, { .i64 = AVFILTER_THREAD_SLICE }, .flags = FLAGS, .unit = "thread_type" }, + { "threads", "Maximum number of threads", OFFSET(nb_threads), + AV_OPT_TYPE_INT, { .i64 = 0 }, 0, INT_MAX, FLAGS }, + {"scale_sws_opts" , "default scale filter options" , OFFSET(scale_sws_opts) , + AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, FLAGS }, + {"aresample_swr_opts" , "default aresample filter options" , OFFSET(aresample_swr_opts) , + AV_OPT_TYPE_STRING, {.str = NULL}, 0, 0, FLAGS }, + { NULL }, }; - static const AVClass filtergraph_class = { .class_name = "AVFilterGraph", .item_name = av_default_item_name, - .option = options, .version = LIBAVUTIL_VERSION_INT, + .option = filtergraph_options, .category = AV_CLASS_CATEGORY_FILTER, }; +#if !HAVE_THREADS +void ff_graph_thread_free(AVFilterGraph *graph) +{ +} + +int ff_graph_thread_init(AVFilterGraph *graph) +{ + graph->thread_type = 0; + graph->nb_threads = 1; + return 0; +} +#endif + AVFilterGraph *avfilter_graph_alloc(void) { AVFilterGraph *ret = av_mallocz(sizeof(*ret)); if (!ret) return NULL; + + ret->internal = av_mallocz(sizeof(*ret->internal)); + if (!ret->internal) { + av_freep(&ret); + return NULL; + } + ret->av_class = &filtergraph_class; + av_opt_set_defaults(ret); + return ret; } @@ -80,11 +112,15 @@ void avfilter_graph_free(AVFilterGraph **graph) while ((*graph)->nb_filters) avfilter_free((*graph)->filters[0]); + ff_graph_thread_free(*graph); + av_freep(&(*graph)->sink_links); + av_freep(&(*graph)->scale_sws_opts); av_freep(&(*graph)->aresample_swr_opts); av_freep(&(*graph)->resample_lavr_opts); av_freep(&(*graph)->filters); + av_freep(&(*graph)->internal); av_freep(graph); } @@ -143,6 +179,14 @@ AVFilterContext *avfilter_graph_alloc_filter(AVFilterGraph *graph, { AVFilterContext **filters, *s; + if (graph->thread_type && !graph->internal->thread) { + int ret = ff_graph_thread_init(graph); + if (ret < 0) { + av_log(graph, AV_LOG_ERROR, "Error initializing threading.\n"); + return NULL; + } + } + s = ff_filter_alloc(filter, name); if (!s) return NULL; diff --git a/libavfilter/internal.h b/libavfilter/internal.h index fbe935667f..f7df6d3b41 100644 --- a/libavfilter/internal.h +++ b/libavfilter/internal.h @@ -27,6 +27,7 @@ #include "avfilter.h" #include "avfiltergraph.h" #include "formats.h" +#include "thread.h" #include "video.h" #define POOL_SIZE 32 @@ -141,6 +142,17 @@ struct AVFilterPad { }; #endif +struct AVFilterGraphInternal { + void *thread; + int (*thread_execute)(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs); +}; + +struct AVFilterInternal { + int (*execute)(AVFilterContext *ctx, action_func *func, void *arg, + int *ret, int nb_jobs); +}; + /** default handler for freeing audio/video buffer when there are no references left */ void ff_avfilter_default_free_buffer(AVFilterBuffer *buf); diff --git a/libavfilter/pthread.c b/libavfilter/pthread.c new file mode 100644 index 0000000000..6ceb8e65e3 --- /dev/null +++ b/libavfilter/pthread.c @@ -0,0 +1,229 @@ +/* + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @file + * Libavfilter multithreading support + */ + +#include "config.h" + +#include "libavutil/common.h" +#include "libavutil/cpu.h" +#include "libavutil/mem.h" + +#include "avfilter.h" +#include "internal.h" +#include "thread.h" + +#if HAVE_PTHREADS +#include +#elif HAVE_W32THREADS +#include "compat/w32pthreads.h" +#endif + +typedef struct ThreadContext { + AVFilterGraph *graph; + + int nb_threads; + pthread_t *workers; + action_func *func; + + /* per-execute perameters */ + AVFilterContext *ctx; + void *arg; + int *rets; + int nb_rets; + int nb_jobs; + + pthread_cond_t last_job_cond; + pthread_cond_t current_job_cond; + pthread_mutex_t current_job_lock; + int current_job; + int done; +} ThreadContext; + +static void* attribute_align_arg worker(void *v) +{ + ThreadContext *c = v; + int our_job = c->nb_jobs; + int nb_threads = c->nb_threads; + int self_id; + + pthread_mutex_lock(&c->current_job_lock); + self_id = c->current_job++; + for (;;) { + while (our_job >= c->nb_jobs) { + if (c->current_job == nb_threads + c->nb_jobs) + pthread_cond_signal(&c->last_job_cond); + + pthread_cond_wait(&c->current_job_cond, &c->current_job_lock); + our_job = self_id; + + if (c->done) { + pthread_mutex_unlock(&c->current_job_lock); + return NULL; + } + } + pthread_mutex_unlock(&c->current_job_lock); + + c->rets[our_job % c->nb_rets] = c->func(c->ctx, c->arg, our_job, c->nb_jobs); + + pthread_mutex_lock(&c->current_job_lock); + our_job = c->current_job++; + } +} + +static void slice_thread_uninit(ThreadContext *c) +{ + int i; + + pthread_mutex_lock(&c->current_job_lock); + c->done = 1; + pthread_cond_broadcast(&c->current_job_cond); + pthread_mutex_unlock(&c->current_job_lock); + + for (i = 0; i < c->nb_threads; i++) + pthread_join(c->workers[i], NULL); + + pthread_mutex_destroy(&c->current_job_lock); + pthread_cond_destroy(&c->current_job_cond); + pthread_cond_destroy(&c->last_job_cond); + av_freep(&c->workers); +} + +static void slice_thread_park_workers(ThreadContext *c) +{ + pthread_cond_wait(&c->last_job_cond, &c->current_job_lock); + pthread_mutex_unlock(&c->current_job_lock); +} + +static int thread_execute(AVFilterContext *ctx, action_func *func, + void *arg, int *ret, int nb_jobs) +{ + ThreadContext *c = ctx->graph->internal->thread; + int dummy_ret; + + if (nb_jobs <= 0) + return 0; + + pthread_mutex_lock(&c->current_job_lock); + + c->current_job = c->nb_threads; + c->nb_jobs = nb_jobs; + c->ctx = ctx; + c->arg = arg; + c->func = func; + if (ret) { + c->rets = ret; + c->nb_rets = nb_jobs; + } else { + c->rets = &dummy_ret; + c->nb_rets = 1; + } + pthread_cond_broadcast(&c->current_job_cond); + + slice_thread_park_workers(c); + + return 0; +} + +static int thread_init(ThreadContext *c, int nb_threads) +{ + int i, ret; + + if (!nb_threads) { + int nb_cpus = av_cpu_count(); + av_log(c->graph, AV_LOG_DEBUG, "Detected %d logical cores.\n", nb_cpus); + // use number of cores + 1 as thread count if there is more than one + if (nb_cpus > 1) + nb_threads = nb_cpus + 1; + else + nb_threads = 1; + } + + if (nb_threads <= 1) + return 1; + + c->nb_threads = nb_threads; + c->workers = av_mallocz(sizeof(*c->workers) * nb_threads); + if (!c->workers) + return AVERROR(ENOMEM); + + c->current_job = 0; + c->nb_jobs = 0; + c->done = 0; + + pthread_cond_init(&c->current_job_cond, NULL); + pthread_cond_init(&c->last_job_cond, NULL); + + pthread_mutex_init(&c->current_job_lock, NULL); + pthread_mutex_lock(&c->current_job_lock); + for (i = 0; i < nb_threads; i++) { + ret = pthread_create(&c->workers[i], NULL, worker, c); + if (ret) { + pthread_mutex_unlock(&c->current_job_lock); + c->nb_threads = i; + slice_thread_uninit(c); + return AVERROR(ret); + } + } + + slice_thread_park_workers(c); + + return c->nb_threads; +} + +int ff_graph_thread_init(AVFilterGraph *graph) +{ + int ret; + +#if HAVE_W32THREADS + w32thread_init(); +#endif + + if (graph->nb_threads == 1) { + graph->thread_type = 0; + return 0; + } + + graph->internal->thread = av_mallocz(sizeof(ThreadContext)); + if (!graph->internal->thread) + return AVERROR(ENOMEM); + + ret = thread_init(graph->internal->thread, graph->nb_threads); + if (ret <= 1) { + av_freep(&graph->internal->thread); + graph->thread_type = 0; + graph->nb_threads = 1; + return (ret < 0) ? ret : 0; + } + graph->nb_threads = ret; + + graph->internal->thread_execute = thread_execute; + + return 0; +} + +void ff_graph_thread_free(AVFilterGraph *graph) +{ + if (graph->internal->thread) + slice_thread_uninit(graph->internal->thread); + av_freep(&graph->internal->thread); +} diff --git a/libavfilter/thread.h b/libavfilter/thread.h new file mode 100644 index 0000000000..0dc3aaacb1 --- /dev/null +++ b/libavfilter/thread.h @@ -0,0 +1,31 @@ +/* + * + * This file is part of FFmpeg. + * + * FFmpeg is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * FFmpeg is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FFmpeg; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef AVFILTER_THREAD_H +#define AVFILTER_THREAD_H + +#include "avfilter.h" + +typedef int (action_func)(AVFilterContext *ctx, void *arg, int jobnr, int nb_jobs); + +int ff_graph_thread_init(AVFilterGraph *graph); + +void ff_graph_thread_free(AVFilterGraph *graph); + +#endif /* AVFILTER_THREAD_H */ diff --git a/libavfilter/version.h b/libavfilter/version.h index 9043b5058a..9b1c881a21 100644 --- a/libavfilter/version.h +++ b/libavfilter/version.h @@ -30,8 +30,8 @@ #include "libavutil/avutil.h" #define LIBAVFILTER_VERSION_MAJOR 3 -#define LIBAVFILTER_VERSION_MINOR 69 -#define LIBAVFILTER_VERSION_MICRO 101 +#define LIBAVFILTER_VERSION_MINOR 70 +#define LIBAVFILTER_VERSION_MICRO 100 #define LIBAVFILTER_VERSION_INT AV_VERSION_INT(LIBAVFILTER_VERSION_MAJOR, \ LIBAVFILTER_VERSION_MINOR, \