diff --git a/libavformat/network.c b/libavformat/network.c index c97e59d620..5664455d18 100644 --- a/libavformat/network.c +++ b/libavformat/network.c @@ -24,6 +24,7 @@ #include "url.h" #include "libavcodec/internal.h" #include "libavutil/avutil.h" +#include "libavutil/avassert.h" #include "libavutil/mem.h" #include "libavutil/time.h" @@ -299,6 +300,230 @@ int ff_listen_connect(int fd, const struct sockaddr *addr, return ret; } +static void interleave_addrinfo(struct addrinfo *base) +{ + struct addrinfo **next = &base->ai_next; + while (*next) { + struct addrinfo *cur = *next; + // Iterate forward until we find an entry of a different family. + if (cur->ai_family == base->ai_family) { + next = &cur->ai_next; + continue; + } + if (cur == base->ai_next) { + // If the first one following base is of a different family, just + // move base forward one step and continue. + base = cur; + next = &base->ai_next; + continue; + } + // Unchain cur from the rest of the list from its current spot. + *next = cur->ai_next; + // Hook in cur directly after base. + cur->ai_next = base->ai_next; + base->ai_next = cur; + // Restart with a new base. We know that before moving the cur element, + // everything between the previous base and cur had the same family, + // different from cur->ai_family. Therefore, we can keep next pointing + // where it was, and continue from there with base at the one after + // cur. + base = cur->ai_next; + } +} + +static void print_address_list(void *ctx, const struct addrinfo *addr, + const char *title) +{ + char hostbuf[100], portbuf[20]; + av_log(ctx, AV_LOG_DEBUG, "%s:\n", title); + while (addr) { + getnameinfo(addr->ai_addr, addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(ctx, AV_LOG_DEBUG, "Address %s port %s\n", hostbuf, portbuf); + addr = addr->ai_next; + } +} + +struct ConnectionAttempt { + int fd; + int64_t deadline_us; + struct addrinfo *addr; +}; + +// Returns < 0 on error, 0 on successfully started connection attempt, +// > 0 for a connection that succeeded already. +static int start_connect_attempt(struct ConnectionAttempt *attempt, + struct addrinfo **ptr, int timeout_ms, + URLContext *h, + void (*customize_fd)(void *, int), void *customize_ctx) +{ + struct addrinfo *ai = *ptr; + int ret; + + *ptr = ai->ai_next; + + attempt->fd = ff_socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (attempt->fd < 0) + return ff_neterrno(); + attempt->deadline_us = av_gettime_relative() + timeout_ms * 1000; + attempt->addr = ai; + + ff_socket_nonblock(attempt->fd, 1); + + if (customize_fd) + customize_fd(customize_ctx, attempt->fd); + + while ((ret = connect(attempt->fd, ai->ai_addr, ai->ai_addrlen))) { + ret = ff_neterrno(); + switch (ret) { + case AVERROR(EINTR): + if (ff_check_interrupt(&h->interrupt_callback)) { + closesocket(attempt->fd); + attempt->fd = -1; + return AVERROR_EXIT; + } + continue; + case AVERROR(EINPROGRESS): + case AVERROR(EAGAIN): + return 0; + default: + closesocket(attempt->fd); + attempt->fd = -1; + return ret; + } + } + return 1; +} + +// Try a new connection to another address after 200 ms, as suggested in +// RFC 8305 (or sooner if an earlier attempt fails). +#define NEXT_ATTEMPT_DELAY_MS 200 + +int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address, + int parallel, URLContext *h, int *fd, + void (*customize_fd)(void *, int), void *customize_ctx) +{ + struct ConnectionAttempt attempts[3]; + struct pollfd pfd[3]; + int nb_attempts = 0, i, j; + int64_t next_attempt_us = av_gettime_relative(), next_deadline_us; + int last_err = AVERROR(EIO); + socklen_t optlen; + char errbuf[100], hostbuf[100], portbuf[20]; + + if (parallel > FF_ARRAY_ELEMS(attempts)) + parallel = FF_ARRAY_ELEMS(attempts); + + print_address_list(h, addrs, "Original list of addresses"); + // This mutates the list, but the head of the list is still the same + // element, so the caller, who owns the list, doesn't need to get + // an updated pointer. + interleave_addrinfo(addrs); + print_address_list(h, addrs, "Interleaved list of addresses"); + + while (nb_attempts > 0 || addrs) { + // Start a new connection attempt, if possible. + if (nb_attempts < parallel && addrs) { + getnameinfo(addrs->ai_addr, addrs->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(h, AV_LOG_VERBOSE, "Starting connection attempt to %s port %s\n", + hostbuf, portbuf); + last_err = start_connect_attempt(&attempts[nb_attempts], &addrs, + timeout_ms_per_address, h, + customize_fd, customize_ctx); + if (last_err < 0) { + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_VERBOSE, "Connected attempt failed: %s\n", + errbuf); + continue; + } + if (last_err > 0) { + for (i = 0; i < nb_attempts; i++) + closesocket(attempts[i].fd); + *fd = attempts[nb_attempts].fd; + return 0; + } + pfd[nb_attempts].fd = attempts[nb_attempts].fd; + pfd[nb_attempts].events = POLLOUT; + next_attempt_us = av_gettime_relative() + NEXT_ATTEMPT_DELAY_MS * 1000; + nb_attempts++; + } + + av_assert0(nb_attempts > 0); + // The connection attempts are sorted from oldest to newest, so the + // first one will have the earliest deadline. + next_deadline_us = attempts[0].deadline_us; + // If we can start another attempt in parallel, wait until that time. + if (nb_attempts < parallel && addrs) + next_deadline_us = FFMIN(next_deadline_us, next_attempt_us); + last_err = ff_poll_interrupt(pfd, nb_attempts, + (next_deadline_us - av_gettime_relative())/1000, + &h->interrupt_callback); + if (last_err < 0 && last_err != AVERROR(ETIMEDOUT)) + break; + + // Check the status from the poll output. + for (i = 0; i < nb_attempts; i++) { + last_err = 0; + if (pfd[i].revents) { + // Some sort of action for this socket, check its status (either + // a successful connection or an error). + optlen = sizeof(last_err); + if (getsockopt(attempts[i].fd, SOL_SOCKET, SO_ERROR, &last_err, &optlen)) + last_err = ff_neterrno(); + else if (last_err != 0) + last_err = AVERROR(last_err); + if (last_err == 0) { + // Everything is ok, we seem to have a successful + // connection. Close other sockets and return this one. + for (j = 0; j < nb_attempts; j++) + if (j != i) + closesocket(attempts[j].fd); + *fd = attempts[i].fd; + getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_log(h, AV_LOG_VERBOSE, "Successfully connected to %s port %s\n", + hostbuf, portbuf); + return 0; + } + } + if (attempts[i].deadline_us < av_gettime_relative() && !last_err) + last_err = AVERROR(ETIMEDOUT); + if (!last_err) + continue; + // Error (or timeout) for this socket; close the socket and remove + // it from the attempts/pfd arrays, to let a new attempt start + // directly. + getnameinfo(attempts[i].addr->ai_addr, attempts[i].addr->ai_addrlen, + hostbuf, sizeof(hostbuf), portbuf, sizeof(portbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_VERBOSE, "Connection attempt to %s port %s " + "failed: %s\n", hostbuf, portbuf, errbuf); + closesocket(attempts[i].fd); + memmove(&attempts[i], &attempts[i + 1], + (nb_attempts - i - 1) * sizeof(*attempts)); + memmove(&pfd[i], &pfd[i + 1], + (nb_attempts - i - 1) * sizeof(*pfd)); + i--; + nb_attempts--; + } + } + for (i = 0; i < nb_attempts; i++) + closesocket(attempts[i].fd); + if (last_err >= 0) + last_err = AVERROR(ECONNREFUSED); + if (last_err != AVERROR_EXIT) { + av_strerror(last_err, errbuf, sizeof(errbuf)); + av_log(h, AV_LOG_ERROR, "Connection to %s failed: %s\n", + h->filename, errbuf); + } + return last_err; +} + static int match_host_pattern(const char *pattern, const char *hostname) { int len_p, len_h; diff --git a/libavformat/network.h b/libavformat/network.h index 7843b908a1..7f467304a8 100644 --- a/libavformat/network.h +++ b/libavformat/network.h @@ -306,4 +306,32 @@ int ff_socket(int domain, int type, int protocol); void ff_log_net_error(void *ctx, int level, const char* prefix); +/** + * Connect to any of the given addrinfo addresses, with multiple attempts + * running in parallel. + * + * @param addrs The list of addresses to try to connect to. + * This list will be mutated internally, but the list head + * will remain as such, so this doesn't affect the caller + * freeing the list afterwards. + * @param timeout_ms_per_address The number of milliseconds to wait for each + * connection attempt. Since multiple addresses are tried, + * some of them in parallel, the total run time will at most + * be timeout_ms_per_address*ceil(nb_addrs/parallel) + + * (parallel - 1) * NEXT_ATTEMPT_DELAY_MS. + * @param parallel The maximum number of connections to attempt in parallel. + * This is limited to an internal maximum capacity. + * @param h URLContext providing interrupt check + * callback and logging context. + * @param fd If successful, the connected socket is returned here. + * @param customize_fd Function that will be called for each socket created, + * to allow the caller to set socket options before calling + * connect() on it, may be NULL. + * @param customize_ctx Context parameter passed to customize_fd. + * @return 0 on success, AVERROR on failure. + */ +int ff_connect_parallel(struct addrinfo *addrs, int timeout_ms_per_address, + int parallel, URLContext *h, int *fd, + void (*customize_fd)(void *, int), void *customize_ctx); + #endif /* AVFORMAT_NETWORK_H */