1
0
mirror of https://github.com/FFmpeg/FFmpeg.git synced 2025-08-04 22:03:09 +02:00

avformat/udp: make recv addr of each packet available

This commit is contained in:
Timo Rothenpieler
2025-06-28 20:08:14 +02:00
parent 2c7e117fe0
commit af04a27893
2 changed files with 36 additions and 18 deletions

View File

@ -338,4 +338,6 @@ int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address,
int parallel, URLContext *h, int *fd, int parallel, URLContext *h, int *fd,
int (*customize_fd)(void *, int, int), void *customize_ctx); int (*customize_fd)(void *, int, int), void *customize_ctx);
void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len);
#endif /* AVFORMAT_NETWORK_H */ #endif /* AVFORMAT_NETWORK_H */

View File

@ -78,6 +78,12 @@
#define UDP_MAX_PKT_SIZE 65536 #define UDP_MAX_PKT_SIZE 65536
#define UDP_HEADER_SIZE 8 #define UDP_HEADER_SIZE 8
typedef struct UDPQueuedPacketHeader {
int pkt_size;
struct sockaddr_storage addr;
socklen_t addr_len;
} UDPQueuedPacketHeader;
typedef struct UDPContext { typedef struct UDPContext {
const AVClass *class; const AVClass *class;
int udp_fd; int udp_fd;
@ -107,7 +113,7 @@ typedef struct UDPContext {
pthread_cond_t cond; pthread_cond_t cond;
int thread_started; int thread_started;
#endif #endif
uint8_t tmp[UDP_MAX_PKT_SIZE+4]; uint8_t tmp[UDP_MAX_PKT_SIZE + sizeof(UDPQueuedPacketHeader)];
int remaining_in_dg; int remaining_in_dg;
char *localaddr; char *localaddr;
int timeout; int timeout;
@ -115,6 +121,8 @@ typedef struct UDPContext {
char *sources; char *sources;
char *block; char *block;
IPSourceFilters filters; IPSourceFilters filters;
struct sockaddr_storage last_recv_addr;
socklen_t last_recv_addr_len;
} UDPContext; } UDPContext;
#define OFFSET(x) offsetof(UDPContext, x) #define OFFSET(x) offsetof(UDPContext, x)
@ -467,6 +475,13 @@ int ff_udp_get_local_port(URLContext *h)
return s->local_port; return s->local_port;
} }
void ff_udp_get_last_recv_addr(URLContext *h, struct sockaddr_storage *addr, socklen_t *addr_len)
{
UDPContext *s = h->priv_data;
*addr = s->last_recv_addr;
*addr_len = s->last_recv_addr_len;
}
/** /**
* Return the udp file handle for select() usage to wait for several RTP * Return the udp file handle for select() usage to wait for several RTP
* streams at the same time. * streams at the same time.
@ -495,30 +510,28 @@ static void *circular_buffer_task_rx( void *_URLContext)
goto end; goto end;
} }
while(1) { while(1) {
int len; UDPQueuedPacketHeader pkt_header;
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
pthread_mutex_unlock(&s->mutex); pthread_mutex_unlock(&s->mutex);
/* Blocking operations are always cancellation points; /* Blocking operations are always cancellation points;
see "General Information" / "Thread Cancelation Overview" see "General Information" / "Thread Cancelation Overview"
in Single Unix. */ in Single Unix. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
len = recvfrom(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0, (struct sockaddr *)&addr, &addr_len); pkt_header.pkt_size = recvfrom(s->udp_fd, s->tmp + sizeof(pkt_header), sizeof(s->tmp) - sizeof(pkt_header), 0, (struct sockaddr *)&pkt_header.addr, &pkt_header.addr_len);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
pthread_mutex_lock(&s->mutex); pthread_mutex_lock(&s->mutex);
if (len < 0) { if (pkt_header.pkt_size < 0) {
if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) { if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
s->circular_buffer_error = ff_neterrno(); s->circular_buffer_error = ff_neterrno();
goto end; goto end;
} }
continue; continue;
} }
if (ff_ip_check_source_lists(&addr, &s->filters)) if (ff_ip_check_source_lists(&pkt_header.addr, &s->filters))
continue; continue;
AV_WL32(s->tmp, len); memcpy(s->tmp, &pkt_header, sizeof(pkt_header));
if (av_fifo_can_write(s->fifo) < len + 4) { if (av_fifo_can_write(s->fifo) < pkt_header.pkt_size + sizeof(pkt_header)) {
/* No Space left */ /* No Space left */
if (s->overrun_nonfatal) { if (s->overrun_nonfatal) {
av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
@ -532,7 +545,7 @@ static void *circular_buffer_task_rx( void *_URLContext)
goto end; goto end;
} }
} }
av_fifo_write(s->fifo, s->tmp, len + 4); av_fifo_write(s->fifo, s->tmp, pkt_header.pkt_size + sizeof(pkt_header));
pthread_cond_signal(&s->cond); pthread_cond_signal(&s->cond);
} }
@ -991,8 +1004,6 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
{ {
UDPContext *s = h->priv_data; UDPContext *s = h->priv_data;
int ret; int ret;
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
#if HAVE_PTHREAD_CANCEL #if HAVE_PTHREAD_CANCEL
int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
@ -1001,17 +1012,21 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
do { do {
avail = av_fifo_can_read(s->fifo); avail = av_fifo_can_read(s->fifo);
if (avail) { // >=size) { if (avail) { // >=size) {
uint8_t tmp[4]; UDPQueuedPacketHeader header;
av_fifo_read(s->fifo, tmp, 4); av_fifo_read(s->fifo, &header, sizeof(header));
avail = AV_RL32(tmp);
s->last_recv_addr = header.addr;
s->last_recv_addr_len = header.addr_len;
avail = header.pkt_size;
if(avail > size){ if(avail > size){
av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
avail = size; avail = size;
} }
av_fifo_read(s->fifo, buf, avail); av_fifo_read(s->fifo, buf, avail);
av_fifo_drain2(s->fifo, AV_RL32(tmp) - avail); av_fifo_drain2(s->fifo, header.pkt_size - avail);
pthread_mutex_unlock(&s->mutex); pthread_mutex_unlock(&s->mutex);
return avail; return avail;
} else if(s->circular_buffer_error){ } else if(s->circular_buffer_error){
@ -1043,10 +1058,11 @@ static int udp_read(URLContext *h, uint8_t *buf, int size)
if (ret < 0) if (ret < 0)
return ret; return ret;
} }
ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&addr, &addr_len); s->last_recv_addr_len = sizeof(s->last_recv_addr);
ret = recvfrom(s->udp_fd, buf, size, 0, (struct sockaddr *)&s->last_recv_addr, &s->last_recv_addr_len);
if (ret < 0) if (ret < 0)
return ff_neterrno(); return ff_neterrno();
if (ff_ip_check_source_lists(&addr, &s->filters)) if (ff_ip_check_source_lists(&s->last_recv_addr, &s->filters))
return AVERROR(EINTR); return AVERROR(EINTR);
return ret; return ret;
} }