diff --git a/libavformat/udp.c b/libavformat/udp.c index 3cdee067f2..d360bd6057 100644 --- a/libavformat/udp.c +++ b/libavformat/udp.c @@ -69,6 +69,8 @@ typedef struct { int circular_buffer_error; #if HAVE_PTHREADS pthread_t circular_buffer_thread; + pthread_mutex_t mutex; + pthread_cond_t cond; #endif uint8_t tmp[UDP_MAX_PKT_SIZE+4]; int remaining_in_dg; @@ -317,6 +319,7 @@ static int udp_get_file_handle(URLContext *h) return s->udp_fd; } +#if HAVE_PTHREADS static void *circular_buffer_task( void *_URLContext) { URLContext *h = _URLContext; @@ -331,7 +334,7 @@ static void *circular_buffer_task( void *_URLContext) if (ff_check_interrupt(&h->interrupt_callback)) { s->circular_buffer_error = EINTR; - return NULL; + goto end; } FD_ZERO(&rfds); @@ -343,7 +346,7 @@ static void *circular_buffer_task( void *_URLContext) if (ff_neterrno() == AVERROR(EINTR)) continue; s->circular_buffer_error = EIO; - return NULL; + goto end; } if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds))) @@ -357,23 +360,31 @@ static void *circular_buffer_task( void *_URLContext) if(left < UDP_MAX_PKT_SIZE + 4) { av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n"); s->circular_buffer_error = EIO; - return NULL; + goto end; } left = FFMIN(left, s->fifo->end - s->fifo->wptr); len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0); if (len < 0) { if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { s->circular_buffer_error = EIO; - return NULL; + goto end; } continue; } AV_WL32(s->tmp, len); + pthread_mutex_lock(&s->mutex); av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL); + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); } +end: + pthread_mutex_lock(&s->mutex); + pthread_cond_signal(&s->cond); + pthread_mutex_unlock(&s->mutex); return NULL; } +#endif /* put it in UDP context */ /* return non zero if error */ @@ -516,6 +527,8 @@ static int udp_open(URLContext *h, const char *uri, int flags) if (!is_output && s->circular_buffer_size) { /* start the task going */ s->fifo = av_fifo_alloc(s->circular_buffer_size); + pthread_mutex_init(&s->mutex, NULL); + pthread_cond_init(&s->cond, NULL); if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) { av_log(h, AV_LOG_ERROR, "pthread_create failed\n"); goto fail; @@ -536,15 +549,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size) UDPContext *s = h->priv_data; int ret; int avail; - fd_set rfds; - struct timeval tv; +#if HAVE_PTHREADS if (s->fifo) { - + pthread_mutex_lock(&s->mutex); do { avail = av_fifo_size(s->fifo); if (avail) { // >=size) { uint8_t tmp[4]; + pthread_mutex_unlock(&s->mutex); av_fifo_generic_read(s->fifo, tmp, 4, NULL); avail= AV_RL32(tmp); @@ -557,19 +570,15 @@ static int udp_read(URLContext *h, uint8_t *buf, int size) av_fifo_drain(s->fifo, AV_RL32(tmp) - avail); return avail; } else if(s->circular_buffer_error){ + pthread_mutex_unlock(&s->mutex); return s->circular_buffer_error; } else { - FD_ZERO(&rfds); - FD_SET(s->udp_fd, &rfds); - tv.tv_sec = 1; - tv.tv_usec = 0; - ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv); - if (ret<0) - return ret; + pthread_cond_wait(&s->cond, &s->mutex); } } while( 1); } +#endif if (!(h->flags & AVIO_FLAG_NONBLOCK)) { ret = ff_network_wait_fd(s->udp_fd, 0); @@ -610,6 +619,10 @@ static int udp_close(URLContext *h) udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr); closesocket(s->udp_fd); av_fifo_free(s->fifo); +#if HAVE_PTHREADS + pthread_mutex_destroy(&s->mutex); + pthread_cond_destroy(&s->cond); +#endif return 0; }