diff --git a/libavformat/applehttp.c b/libavformat/applehttp.c index 815013d5e2..7ab3ab7372 100644 --- a/libavformat/applehttp.c +++ b/libavformat/applehttp.c @@ -30,6 +30,9 @@ #include "avformat.h" #include "internal.h" #include +#include "avio_internal.h" + +#define INITIAL_BUFFER_SIZE 32768 /* * An apple http stream consists of a playlist with media segment files, @@ -56,7 +59,11 @@ struct segment { struct variant { int bandwidth; char url[MAX_URL_SIZE]; - AVIOContext *pb; + AVIOContext pb; + uint8_t* read_buffer; + URLContext *input; + AVFormatContext *parent; + int index; AVFormatContext *ctx; AVPacket pkt; int stream_offset; @@ -66,16 +73,17 @@ struct variant { int start_seq_no; int n_segments; struct segment **segments; - int needed; + int needed, cur_needed; + int cur_seq_no; + int64_t last_load_time; }; typedef struct AppleHTTPContext { int n_variants; struct variant **variants; int cur_seq_no; - int64_t last_load_time; - int64_t last_packet_dts; - int max_start_seq, min_end_seq; + int end_of_segment; + int first_packet; } AppleHTTPContext; static int read_chomp_line(AVIOContext *s, char *buf, int maxlen) @@ -102,8 +110,9 @@ static void free_variant_list(AppleHTTPContext *c) struct variant *var = c->variants[i]; free_segment_list(var); av_free_packet(&var->pkt); - if (var->pb) - avio_close(var->pb); + av_free(var->pb.buffer); + if (var->input) + url_close(var->input); if (var->ctx) { var->ctx->pb = NULL; av_close_input_file(var->ctx); @@ -238,7 +247,8 @@ static int parse_playlist(AppleHTTPContext *c, const char *url, } } } - c->last_load_time = av_gettime(); + if (var) + var->last_load_time = av_gettime(); fail: if (close_in) @@ -246,6 +256,71 @@ fail: return ret; } +static int read_data(void *opaque, uint8_t *buf, int buf_size) +{ + struct variant *v = opaque; + AppleHTTPContext *c = v->parent->priv_data; + int ret, i; + +restart: + if (!v->input) { +reload: + /* If this is a live stream and target_duration has elapsed since + * the last playlist reload, reload the variant playlists now. */ + if (!v->finished && + av_gettime() - v->last_load_time >= v->target_duration*1000000 && + (ret = parse_playlist(c, v->url, v, NULL)) < 0) + return ret; + if (v->cur_seq_no < v->start_seq_no) { + av_log(NULL, AV_LOG_WARNING, + "skipping %d segments ahead, expired from playlists\n", + v->start_seq_no - v->cur_seq_no); + v->cur_seq_no = v->start_seq_no; + } + if (v->cur_seq_no >= v->start_seq_no + v->n_segments) { + if (v->finished) + return AVERROR_EOF; + while (av_gettime() - v->last_load_time < + v->target_duration*1000000) { + if (url_interrupt_cb()) + return AVERROR_EXIT; + usleep(100*1000); + } + /* Enough time has elapsed since the last reload */ + goto reload; + } + + ret = url_open(&v->input, + v->segments[v->cur_seq_no - v->start_seq_no]->url, + URL_RDONLY); + if (ret < 0) + return ret; + } + ret = url_read(v->input, buf, buf_size); + if (ret > 0) + return ret; + if (ret < 0 && ret != AVERROR_EOF) + return ret; + url_close(v->input); + v->input = NULL; + v->cur_seq_no++; + + c->end_of_segment = 1; + c->cur_seq_no = v->cur_seq_no; + + v->needed = 0; + for (i = v->stream_offset; i < v->stream_offset + v->ctx->nb_streams; i++) { + if (v->parent->streams[i]->discard < AVDISCARD_ALL) + v->needed = 1; + } + if (!v->needed) { + av_log(v->parent, AV_LOG_INFO, "No longer receiving variant %d\n", + v->index); + return AVERROR_EOF; + } + goto restart; +} + static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap) { AppleHTTPContext *c = s->priv_data; @@ -284,20 +359,35 @@ static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap) s->duration = duration * AV_TIME_BASE; } - c->min_end_seq = INT_MAX; /* Open the demuxer for each variant */ for (i = 0; i < c->n_variants; i++) { struct variant *v = c->variants[i]; + AVInputFormat *in_fmt = NULL; if (v->n_segments == 0) continue; - c->max_start_seq = FFMAX(c->max_start_seq, v->start_seq_no); - c->min_end_seq = FFMIN(c->min_end_seq, v->start_seq_no + - v->n_segments); - ret = av_open_input_file(&v->ctx, v->segments[0]->url, NULL, 0, NULL); + + v->index = i; + v->needed = 1; + v->parent = s; + + /* If this is a live stream with more than 3 segments, start at the + * third last segment. */ + v->cur_seq_no = v->start_seq_no; + if (!v->finished && v->n_segments > 3) + v->cur_seq_no = v->start_seq_no + v->n_segments - 3; + + v->read_buffer = av_malloc(INITIAL_BUFFER_SIZE); + ffio_init_context(&v->pb, v->read_buffer, INITIAL_BUFFER_SIZE, 0, v, + read_data, NULL, NULL); + v->pb.seekable = 0; + ret = av_probe_input_buffer(&v->pb, &in_fmt, v->segments[0]->url, + NULL, 0, 0); + if (ret < 0) + goto fail; + ret = av_open_input_stream(&v->ctx, &v->pb, v->segments[0]->url, + in_fmt, NULL); if (ret < 0) goto fail; - avio_close(v->ctx->pb); - v->ctx->pb = NULL; v->stream_offset = stream_offset; /* Create new AVStreams for each stream in this variant */ for (j = 0; j < v->ctx->nb_streams; j++) { @@ -310,13 +400,8 @@ static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap) } stream_offset += v->ctx->nb_streams; } - c->last_packet_dts = AV_NOPTS_VALUE; - c->cur_seq_no = c->max_start_seq; - /* If this is a live stream with more than 3 segments, start at the - * third last segment. */ - if (!c->variants[0]->finished && c->min_end_seq - c->max_start_seq > 3) - c->cur_seq_no = c->min_end_seq - 2; + c->first_packet = 1; return 0; fail: @@ -324,98 +409,61 @@ fail: return ret; } -static int open_variant(AppleHTTPContext *c, struct variant *var, int skip) +static int recheck_discard_flags(AVFormatContext *s, int first) { - int ret; + AppleHTTPContext *c = s->priv_data; + int i, changed = 0; - if (c->cur_seq_no < var->start_seq_no) { - av_log(NULL, AV_LOG_WARNING, - "seq %d not available in variant %s, skipping\n", - var->start_seq_no, var->url); - return 0; + /* Check if any new streams are needed */ + for (i = 0; i < c->n_variants; i++) + c->variants[i]->cur_needed = 0;; + + for (i = 0; i < s->nb_streams; i++) { + AVStream *st = s->streams[i]; + struct variant *var = c->variants[s->streams[i]->id]; + if (st->discard < AVDISCARD_ALL) + var->cur_needed = 1; } - if (c->cur_seq_no - var->start_seq_no >= var->n_segments) - return c->variants[0]->finished ? AVERROR_EOF : 0; - ret = avio_open(&var->pb, - var->segments[c->cur_seq_no - var->start_seq_no]->url, - URL_RDONLY); - if (ret < 0) - return ret; - var->ctx->pb = var->pb; - /* If this is a new segment in parallel with another one already opened, - * skip ahead so they're all at the same dts. */ - if (skip && c->last_packet_dts != AV_NOPTS_VALUE) { - while (1) { - ret = av_read_frame(var->ctx, &var->pkt); - if (ret < 0) { - if (ret == AVERROR_EOF) { - reset_packet(&var->pkt); - return 0; - } - return ret; - } - if (var->pkt.dts >= c->last_packet_dts) - break; - av_free_packet(&var->pkt); + for (i = 0; i < c->n_variants; i++) { + struct variant *v = c->variants[i]; + if (v->cur_needed && !v->needed) { + v->needed = 1; + changed = 1; + v->cur_seq_no = c->cur_seq_no; + v->pb.eof_reached = 0; + av_log(s, AV_LOG_INFO, "Now receiving variant %d\n", i); + } else if (first && !v->cur_needed && v->needed) { + if (v->input) + url_close(v->input); + v->input = NULL; + v->needed = 0; + changed = 1; + av_log(s, AV_LOG_INFO, "No longer receiving variant %d\n", i); } } - return 0; + return changed; } static int applehttp_read_packet(AVFormatContext *s, AVPacket *pkt) { AppleHTTPContext *c = s->priv_data; - int ret, i, minvariant = -1, first = 1, needed = 0, changed = 0, - variants = 0; + int ret, i, minvariant = -1; - /* Recheck the discard flags - which streams are desired at the moment */ - for (i = 0; i < c->n_variants; i++) - c->variants[i]->needed = 0; - for (i = 0; i < s->nb_streams; i++) { - AVStream *st = s->streams[i]; - struct variant *var = c->variants[s->streams[i]->id]; - if (st->discard < AVDISCARD_ALL) { - var->needed = 1; - needed++; - } - /* Copy the discard flag to the chained demuxer, to indicate which - * streams are desired. */ - var->ctx->streams[i - var->stream_offset]->discard = st->discard; + if (c->first_packet) { + recheck_discard_flags(s, 1); + c->first_packet = 0; } - if (!needed) - return AVERROR_EOF; + start: + c->end_of_segment = 0; for (i = 0; i < c->n_variants; i++) { struct variant *var = c->variants[i]; - /* Close unneeded streams, open newly requested streams */ - if (var->pb && !var->needed) { - av_log(s, AV_LOG_DEBUG, - "Closing variant stream %d, no longer needed\n", i); - av_free_packet(&var->pkt); - reset_packet(&var->pkt); - avio_close(var->pb); - var->pb = NULL; - changed = 1; - } else if (!var->pb && var->needed) { - if (first) - av_log(s, AV_LOG_DEBUG, "Opening variant stream %d\n", i); - if (first && !var->finished) - if ((ret = parse_playlist(c, var->url, var, NULL)) < 0) - return ret; - ret = open_variant(c, var, first); - if (ret < 0) - return ret; - changed = 1; - } - /* Count the number of open variants */ - if (var->pb) - variants++; /* Make sure we've got one buffered packet from each open variant * stream */ - if (var->pb && !var->pkt.data) { + if (var->needed && !var->pkt.data) { ret = av_read_frame(var->ctx, &var->pkt); if (ret < 0) { - if (!var->pb->eof_reached) + if (!var->pb.eof_reached) return ret; reset_packet(&var->pkt); } @@ -427,71 +475,18 @@ start: minvariant = i; } } - if (first && changed) - av_log(s, AV_LOG_INFO, "Receiving %d variant streams\n", variants); + if (c->end_of_segment) { + if (recheck_discard_flags(s, 0)) + goto start; + } /* If we got a packet, return it */ if (minvariant >= 0) { *pkt = c->variants[minvariant]->pkt; pkt->stream_index += c->variants[minvariant]->stream_offset; reset_packet(&c->variants[minvariant]->pkt); - c->last_packet_dts = pkt->dts; return 0; } - /* No more packets - eof reached in all variant streams, close the - * current segments. */ - for (i = 0; i < c->n_variants; i++) { - struct variant *var = c->variants[i]; - if (var->pb) { - avio_close(var->pb); - var->pb = NULL; - } - } - /* Indicate that we're opening the next segment, not opening a new - * variant stream in parallel, so we shouldn't try to skip ahead. */ - first = 0; - c->cur_seq_no++; -reload: - if (!c->variants[0]->finished) { - /* If this is a live stream and target_duration has elapsed since - * the last playlist reload, reload the variant playlists now. */ - int64_t now = av_gettime(); - if (now - c->last_load_time >= c->variants[0]->target_duration*1000000) { - c->max_start_seq = 0; - c->min_end_seq = INT_MAX; - for (i = 0; i < c->n_variants; i++) { - struct variant *var = c->variants[i]; - if (var->needed) { - if ((ret = parse_playlist(c, var->url, var, NULL)) < 0) - return ret; - c->max_start_seq = FFMAX(c->max_start_seq, - var->start_seq_no); - c->min_end_seq = FFMIN(c->min_end_seq, - var->start_seq_no + var->n_segments); - } - } - } - } - if (c->cur_seq_no < c->max_start_seq) { - av_log(NULL, AV_LOG_WARNING, - "skipping %d segments ahead, expired from playlists\n", - c->max_start_seq - c->cur_seq_no); - c->cur_seq_no = c->max_start_seq; - } - /* If more segments exist, open the next one */ - if (c->cur_seq_no < c->min_end_seq) - goto start; - /* We've reached the end of the playlists - return eof if this is a - * non-live stream, wait until the next playlist reload if it is live. */ - if (c->variants[0]->finished) - return AVERROR_EOF; - while (av_gettime() - c->last_load_time < - c->variants[0]->target_duration*1000000) { - if (url_interrupt_cb()) - return AVERROR_EXIT; - usleep(100*1000); - } - /* Enough time has elapsed since the last reload */ - goto reload; + return AVERROR_EOF; } static int applehttp_close(AVFormatContext *s) @@ -506,38 +501,43 @@ static int applehttp_read_seek(AVFormatContext *s, int stream_index, int64_t timestamp, int flags) { AppleHTTPContext *c = s->priv_data; - int64_t pos = 0; - int i; - struct variant *var = c->variants[0]; + int i, j, ret; if ((flags & AVSEEK_FLAG_BYTE) || !c->variants[0]->finished) return AVERROR(ENOSYS); /* Reset the variants */ - c->last_packet_dts = AV_NOPTS_VALUE; for (i = 0; i < c->n_variants; i++) { struct variant *var = c->variants[i]; - if (var->pb) { - avio_close(var->pb); - var->pb = NULL; + if (var->input) { + url_close(var->input); + var->input = NULL; } av_free_packet(&var->pkt); reset_packet(&var->pkt); + var->pb.eof_reached = 0; } timestamp = av_rescale_rnd(timestamp, 1, stream_index >= 0 ? s->streams[stream_index]->time_base.den : AV_TIME_BASE, flags & AVSEEK_FLAG_BACKWARD ? AV_ROUND_DOWN : AV_ROUND_UP); - /* Locate the segment that contains the target timestamp */ - for (i = 0; i < var->n_segments; i++) { - if (timestamp >= pos && timestamp < pos + var->segments[i]->duration) { - c->cur_seq_no = var->start_seq_no + i; - return 0; + ret = AVERROR(EIO); + for (i = 0; i < c->n_variants; i++) { + struct variant *var = c->variants[i]; + int64_t pos = 0; + /* Locate the segment that contains the target timestamp */ + for (j = 0; j < var->n_segments; j++) { + if (timestamp >= pos && + timestamp < pos + var->segments[j]->duration) { + var->cur_seq_no = var->start_seq_no + j; + ret = 0; + break; + } + pos += var->segments[j]->duration; } - pos += var->segments[i]->duration; } - return AVERROR(EIO); + return ret; } static int applehttp_probe(AVProbeData *p)