You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-13 01:00:23 +02:00
Move file descriptor read/write ready into IoRead/IoWrite.
Move sckSessionReadyRead()/Write() into the IoRead/IoWrite interfaces. This is a more logical place for them and the alternative would be to add them to the IoSession interface, which does not seem like a good idea. This is mostly a refactor, but a big change is the select() logic in fdRead.c has been replaced by ioReadReady(). This was duplicated code that was being used by our protocol but not TLS. Since we have not had any problems with requiring poll() in the field this seems like a good time to remove our dependence on select(). Also, IoFdWrite now requires a timeout so update where required, mostly in the tests.
This commit is contained in:
@ -78,6 +78,7 @@
|
||||
<release-development-list>
|
||||
<release-item>
|
||||
<commit subject="Add IoClient and IoSession interfaces."/>
|
||||
<commit subject="Move file descriptor read/write ready into IoRead/IoWrite."/>
|
||||
|
||||
<release-item-contributor-list>
|
||||
<release-item-reviewer id="stephen.frost"/>
|
||||
|
@ -67,6 +67,7 @@ SRCS = \
|
||||
common/io/bufferRead.c \
|
||||
common/io/bufferWrite.c \
|
||||
common/io/client.c \
|
||||
common/io/fd.c \
|
||||
common/io/fdRead.c \
|
||||
common/io/fdWrite.c \
|
||||
common/io/filter/buffer.c \
|
||||
|
@ -32,7 +32,7 @@ cmdLocal(int fdRead, int fdWrite)
|
||||
String *name = strNewFmt(PROTOCOL_SERVICE_LOCAL "-%u", cfgOptionUInt(cfgOptProcess));
|
||||
IoRead *read = ioFdReadNew(name, fdRead, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(name, fdWrite);
|
||||
IoWrite *write = ioFdWriteNew(name, fdWrite, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolServer *server = protocolServerNew(name, PROTOCOL_SERVICE_LOCAL_STR, read, write);
|
||||
|
@ -28,7 +28,7 @@ cmdRemote(int fdRead, int fdWrite)
|
||||
String *name = strNewFmt(PROTOCOL_SERVICE_REMOTE "-%u", cfgOptionUInt(cfgOptProcess));
|
||||
IoRead *read = ioFdReadNew(name, fdRead, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(name, fdWrite);
|
||||
IoWrite *write = ioFdWriteNew(name, fdWrite, (TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolServer *server = protocolServerNew(name, PROTOCOL_SERVICE_REMOTE_STR, read, write);
|
||||
|
@ -193,7 +193,8 @@ cmdStorageGet(void)
|
||||
{
|
||||
TRY_BEGIN()
|
||||
{
|
||||
result = storageGetProcess(ioFdWriteNew(STRDEF("stdout"), STDOUT_FILENO));
|
||||
result = storageGetProcess(
|
||||
ioFdWriteNew(STRDEF("stdout"), STDOUT_FILENO, (TimeMSec)(cfgOptionDbl(cfgOptIoTimeout) * 1000)));
|
||||
}
|
||||
// Ignore write errors because it's possible (even likely) that this output is being piped to something like head which
|
||||
// will exit when it gets what it needs and leave us writing to a broken pipe. It would be better to just ignore the broken
|
||||
|
@ -194,7 +194,8 @@ cmdStorageList(void)
|
||||
{
|
||||
TRY_BEGIN()
|
||||
{
|
||||
storageListRender(ioFdWriteNew(STRDEF("stdout"), STDOUT_FILENO));
|
||||
storageListRender(
|
||||
ioFdWriteNew(STRDEF("stdout"), STDOUT_FILENO, (TimeMSec)(cfgOptionDbl(cfgOptIoTimeout) * 1000)));
|
||||
}
|
||||
// Ignore write errors because it's possible (even likely) that this output is being piped to something like head which
|
||||
// will exit when it gets what it needs and leave us writing to a broken pipe. It would be better to just ignore the broken
|
||||
|
@ -354,7 +354,7 @@ execOpen(Exec *this)
|
||||
|
||||
// Assign file descriptors to io interfaces
|
||||
this->ioReadFd = ioFdReadNew(strNewFmt("%s read", strZ(this->name)), this->fdRead, this->timeout);
|
||||
this->ioWriteFd = ioFdWriteNew(strNewFmt("%s write", strZ(this->name)), this->fdWrite);
|
||||
this->ioWriteFd = ioFdWriteNew(strNewFmt("%s write", strZ(this->name)), this->fdWrite, this->timeout);
|
||||
ioWriteOpen(this->ioWriteFd);
|
||||
|
||||
// Create wrapper interfaces that check process state
|
||||
|
102
src/common/io/fd.c
Normal file
102
src/common/io/fd.c
Normal file
@ -0,0 +1,102 @@
|
||||
/***********************************************************************************************************************************
|
||||
File Descriptor Functions
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include <poll.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/fd.h"
|
||||
#include "common/log.h"
|
||||
#include "common/wait.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Use poll() to determine when data is ready to read/write on a socket. Retry after EINTR with whatever time is left on the timer.
|
||||
***********************************************************************************************************************************/
|
||||
// Helper to determine when poll() should be retried
|
||||
static bool
|
||||
fdReadyRetry(int pollResult, int errNo, bool first, TimeMSec *timeout, TimeMSec timeEnd)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(INT, pollResult);
|
||||
FUNCTION_TEST_PARAM(INT, errNo);
|
||||
FUNCTION_TEST_PARAM(BOOL, first);
|
||||
FUNCTION_TEST_PARAM_P(TIME_MSEC, timeout);
|
||||
FUNCTION_TEST_PARAM(TIME_MSEC, timeEnd);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(timeout != NULL);
|
||||
|
||||
// No retry by default
|
||||
bool result = false;
|
||||
|
||||
// Process errors
|
||||
if (pollResult == -1)
|
||||
{
|
||||
// Don't error on an interrupt. If the interrupt lasts long enough there may be a timeout, though.
|
||||
if (errNo != EINTR)
|
||||
THROW_SYS_ERROR_CODE(errNo, KernelError, "unable to poll socket");
|
||||
|
||||
// Always retry on the first iteration
|
||||
if (first)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
// Else retry if there is time left
|
||||
else
|
||||
{
|
||||
TimeMSec timeCurrent = timeMSec();
|
||||
|
||||
if (timeEnd > timeCurrent)
|
||||
{
|
||||
*timeout = timeEnd - timeCurrent;
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FUNCTION_TEST_RETURN(result);
|
||||
}
|
||||
|
||||
bool
|
||||
fdReady(int fd, bool read, bool write, TimeMSec timeout)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(INT, fd);
|
||||
FUNCTION_LOG_PARAM(BOOL, read);
|
||||
FUNCTION_LOG_PARAM(BOOL, write);
|
||||
FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(fd >= 0);
|
||||
ASSERT(read || write);
|
||||
ASSERT(timeout < INT_MAX);
|
||||
|
||||
// Poll settings
|
||||
struct pollfd inputFd = {.fd = fd};
|
||||
|
||||
if (read)
|
||||
inputFd.events |= POLLIN;
|
||||
|
||||
if (write)
|
||||
inputFd.events |= POLLOUT;
|
||||
|
||||
// Wait for ready or timeout
|
||||
TimeMSec timeEnd = timeMSec() + timeout;
|
||||
bool first = true;
|
||||
|
||||
// Initialize result and errno to look like a retryable error. We have no good way to test this function with interrupts so this
|
||||
// at least ensures that the condition is retried.
|
||||
int result = -1;
|
||||
int errNo = EINTR;
|
||||
|
||||
while (fdReadyRetry(result, errNo, first, &timeout, timeEnd))
|
||||
{
|
||||
result = poll(&inputFd, 1, (int)timeout);
|
||||
|
||||
errNo = errno;
|
||||
first = false;
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result > 0);
|
||||
}
|
29
src/common/io/fd.h
Normal file
29
src/common/io/fd.h
Normal file
@ -0,0 +1,29 @@
|
||||
/***********************************************************************************************************************************
|
||||
File Descriptor Functions
|
||||
***********************************************************************************************************************************/
|
||||
#ifndef COMMON_IO_FD_H
|
||||
#define COMMON_IO_FD_H
|
||||
|
||||
#include "common/time.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Wait until the file descriptor is ready to read/write or timeout
|
||||
bool fdReady(int fd, bool read, bool write, TimeMSec timeout);
|
||||
|
||||
// Wait until the file descriptor is ready to read or timeout
|
||||
__attribute__((always_inline)) static inline bool
|
||||
fdReadyRead(int fd, TimeMSec timeout)
|
||||
{
|
||||
return fdReady(fd, true, false, timeout);
|
||||
}
|
||||
|
||||
// Wait until the file descriptor is ready to write or timeout
|
||||
__attribute__((always_inline)) static inline bool
|
||||
fdReadyWrite(int fd, TimeMSec timeout)
|
||||
{
|
||||
return fdReady(fd, false, true, timeout);
|
||||
}
|
||||
|
||||
#endif
|
@ -3,10 +3,10 @@ File Descriptor Io Read
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include <sys/select.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/fd.h"
|
||||
#include "common/io/fdRead.h"
|
||||
#include "common/io/read.intern.h"
|
||||
#include "common/log.h"
|
||||
@ -33,6 +33,37 @@ Macros for function logging
|
||||
#define FUNCTION_LOG_IO_FD_READ_FORMAT(value, buffer, bufferSize) \
|
||||
objToLog(value, "IoFdRead", buffer, bufferSize)
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Are there bytes ready to read immediately?
|
||||
***********************************************************************************************************************************/
|
||||
static bool
|
||||
ioFdReadReady(THIS_VOID, bool error)
|
||||
{
|
||||
THIS(IoFdRead);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_FD_READ, this);
|
||||
FUNCTION_LOG_PARAM(BOOL, error);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
bool result = true;
|
||||
|
||||
// Check if the file descriptor is ready to read
|
||||
if (!fdReadyRead(this->fd, this->timeout))
|
||||
{
|
||||
// Error if requested
|
||||
if (error)
|
||||
THROW_FMT(FileReadError, "timeout after %" PRIu64 "ms waiting for read from '%s'", this->timeout, strZ(this->name));
|
||||
|
||||
// File descriptor is not ready to read
|
||||
result = false;
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Read data from the file descriptor
|
||||
***********************************************************************************************************************************/
|
||||
@ -58,25 +89,8 @@ ioFdRead(THIS_VOID, Buffer *buffer, bool block)
|
||||
{
|
||||
do
|
||||
{
|
||||
// Initialize the file descriptor set used for select
|
||||
fd_set selectSet;
|
||||
FD_ZERO(&selectSet);
|
||||
|
||||
// We know the file descriptor is not negative because it passed error handling, so it is safe to cast to unsigned
|
||||
FD_SET((unsigned int)this->fd, &selectSet);
|
||||
|
||||
// Initialize timeout struct used for select. Recreate this structure each time since Linux (at least) will modify it.
|
||||
struct timeval timeoutSelect;
|
||||
timeoutSelect.tv_sec = (time_t)(this->timeout / MSEC_PER_SEC);
|
||||
timeoutSelect.tv_usec = (time_t)(this->timeout % MSEC_PER_SEC * 1000);
|
||||
|
||||
// Determine if there is data to be read
|
||||
int result = select(this->fd + 1, &selectSet, NULL, NULL, &timeoutSelect);
|
||||
THROW_ON_SYS_ERROR_FMT(result == -1, FileReadError, "unable to select from %s", strZ(this->name));
|
||||
|
||||
// If no data read after time allotted then error
|
||||
if (!result)
|
||||
THROW_FMT(FileReadError, "unable to read data from %s after %" PRIu64 "ms", strZ(this->name), this->timeout);
|
||||
// Check if there is data to be read and error if not
|
||||
ioFdReadReady(this, true);
|
||||
|
||||
// Read and handle errors
|
||||
THROW_ON_SYS_ERROR_FMT(
|
||||
@ -156,7 +170,7 @@ ioFdReadNew(const String *name, int fd, TimeMSec timeout)
|
||||
.timeout = timeout,
|
||||
};
|
||||
|
||||
this = ioReadNewP(driver, .block = true, .eof = ioFdReadEof, .fd = ioFdReadFd, .read = ioFdRead);
|
||||
this = ioReadNewP(driver, .block = true, .eof = ioFdReadEof, .fd = ioFdReadFd, .read = ioFdRead, .ready = ioFdReadReady);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -6,6 +6,7 @@ File Descriptor Io Write
|
||||
#include <unistd.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/fd.h"
|
||||
#include "common/io/fdWrite.h"
|
||||
#include "common/io/write.intern.h"
|
||||
#include "common/log.h"
|
||||
@ -20,6 +21,7 @@ typedef struct IoFdWrite
|
||||
MemContext *memContext; // Object memory context
|
||||
const String *name; // File descriptor name for error messages
|
||||
int fd; // File descriptor to write to
|
||||
TimeMSec timeout; // Timeout for write operation
|
||||
} IoFdWrite;
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -30,6 +32,37 @@ Macros for function logging
|
||||
#define FUNCTION_LOG_IO_FD_WRITE_FORMAT(value, buffer, bufferSize) \
|
||||
objToLog(value, "IoFdWrite", buffer, bufferSize)
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
// Can bytes be written immediately?
|
||||
***********************************************************************************************************************************/
|
||||
static bool
|
||||
ioFdWriteReady(THIS_VOID, bool error)
|
||||
{
|
||||
THIS(IoFdWrite);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_FD_WRITE, this);
|
||||
FUNCTION_LOG_PARAM(BOOL, error);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
bool result = true;
|
||||
|
||||
// Check if the file descriptor is ready to write
|
||||
if (!fdReadyWrite(this->fd, this->timeout))
|
||||
{
|
||||
// Error if requested
|
||||
if (error)
|
||||
THROW_FMT(FileWriteError, "timeout after %" PRIu64 "ms waiting for write to '%s'", this->timeout, strZ(this->name));
|
||||
|
||||
// File descriptor is not ready to write
|
||||
result = false;
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Write to the file descriptor
|
||||
***********************************************************************************************************************************/
|
||||
@ -71,10 +104,11 @@ ioFdWriteFd(const THIS_VOID)
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
IoWrite *
|
||||
ioFdWriteNew(const String *name, int fd)
|
||||
ioFdWriteNew(const String *name, int fd, TimeMSec timeout)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(INT, fd);
|
||||
FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
IoWrite *this = NULL;
|
||||
@ -88,9 +122,10 @@ ioFdWriteNew(const String *name, int fd)
|
||||
.memContext = memContextCurrent(),
|
||||
.name = strDup(name),
|
||||
.fd = fd,
|
||||
.timeout = timeout,
|
||||
};
|
||||
|
||||
this = ioWriteNewP(driver, .fd = ioFdWriteFd, .write = ioFdWrite);
|
||||
this = ioWriteNewP(driver, .fd = ioFdWriteFd, .ready = ioFdWriteReady, .write = ioFdWrite);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -7,11 +7,12 @@ Write to a file descriptor using the IoWrite interface.
|
||||
#define COMMON_IO_FDWRITE_H
|
||||
|
||||
#include "common/io/write.h"
|
||||
#include "common/time.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructors
|
||||
***********************************************************************************************************************************/
|
||||
IoWrite *ioFdWriteNew(const String *name, int fd);
|
||||
IoWrite *ioFdWriteNew(const String *name, int fd, TimeMSec timeout);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Helper functions
|
||||
|
@ -294,6 +294,25 @@ ioReadLine(IoRead *this)
|
||||
FUNCTION_LOG_RETURN(STRING, ioReadLineParam(this, false));
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
bool
|
||||
ioReadReady(IoRead *this, IoReadReadyParam param)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_READ, this);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.error);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
bool result = true;
|
||||
|
||||
if (this->interface.ready != NULL)
|
||||
result = this->interface.ready(this->driver, param.error);
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
void
|
||||
ioReadClose(IoRead *this)
|
||||
|
@ -35,6 +35,19 @@ String *ioReadLine(IoRead *this);
|
||||
// Read linefeed-terminated string and optionally error on eof
|
||||
String *ioReadLineParam(IoRead *this, bool allowEof);
|
||||
|
||||
// Are there bytes ready to read immediately? There are no guarantees on how much data is available to read but it must be at least
|
||||
// one byte.
|
||||
typedef struct IoReadReadyParam
|
||||
{
|
||||
VAR_PARAM_HEADER;
|
||||
bool error; // Error when read not ready
|
||||
} IoReadReadyParam;
|
||||
|
||||
#define ioReadReadyP(this, ...) \
|
||||
ioReadReady(this, (IoReadReadyParam){VAR_PARAM_INIT, __VA_ARGS__})
|
||||
|
||||
bool ioReadReady(IoRead *this, IoReadReadyParam param);
|
||||
|
||||
// Close the IO
|
||||
void ioReadClose(IoRead *this);
|
||||
|
||||
|
@ -21,6 +21,10 @@ typedef struct IoReadInterface
|
||||
bool (*open)(void *driver);
|
||||
int (*fd)(const void *driver);
|
||||
size_t (*read)(void *driver, Buffer *buffer, bool block);
|
||||
|
||||
// Are there bytes ready to read immediately? There are no guarantees on how much data is available to read but it must be at
|
||||
// least one byte. Optionally error when read is not ready.
|
||||
bool (*ready)(void *driver, bool error);
|
||||
} IoReadInterface;
|
||||
|
||||
#define ioReadNewP(driver, ...) \
|
||||
|
@ -6,11 +6,10 @@ Socket Common Functions
|
||||
#include <fcntl.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <poll.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/fd.h"
|
||||
#include "common/io/socket/common.h"
|
||||
#include "common/log.h"
|
||||
#include "common/wait.h"
|
||||
@ -168,7 +167,7 @@ sckConnect(int fd, const String *host, unsigned int port, const struct addrinfo
|
||||
if (sckConnectInProgress(errNo))
|
||||
{
|
||||
// Wait for write-ready
|
||||
if (!sckReadyWrite(fd, timeout))
|
||||
if (!fdReadyWrite(fd, timeout))
|
||||
THROW_FMT(HostConnectError, "timeout connecting to '%s:%u'", strZ(host), port);
|
||||
|
||||
// Check for success or error. If the connection was successful this will set errNo to 0.
|
||||
@ -185,106 +184,3 @@ sckConnect(int fd, const String *host, unsigned int port, const struct addrinfo
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Use poll() to determine when data is ready to read/write on a socket. Retry after EINTR with whatever time is left on the timer.
|
||||
***********************************************************************************************************************************/
|
||||
// Helper to determine when poll() should be retried
|
||||
static bool
|
||||
sckReadyRetry(int pollResult, int errNo, bool first, TimeMSec *timeout, TimeMSec timeEnd)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(INT, pollResult);
|
||||
FUNCTION_TEST_PARAM(INT, errNo);
|
||||
FUNCTION_TEST_PARAM(BOOL, first);
|
||||
FUNCTION_TEST_PARAM_P(TIME_MSEC, timeout);
|
||||
FUNCTION_TEST_PARAM(TIME_MSEC, timeEnd);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(timeout != NULL);
|
||||
|
||||
// No retry by default
|
||||
bool result = false;
|
||||
|
||||
// Process errors
|
||||
if (pollResult == -1)
|
||||
{
|
||||
// Don't error on an interrupt. If the interrupt lasts long enough there may be a timeout, though.
|
||||
if (errNo != EINTR)
|
||||
THROW_SYS_ERROR_CODE(errNo, KernelError, "unable to poll socket");
|
||||
|
||||
// Always retry on the first iteration
|
||||
if (first)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
// Else retry if there is time left
|
||||
else
|
||||
{
|
||||
TimeMSec timeCurrent = timeMSec();
|
||||
|
||||
if (timeEnd > timeCurrent)
|
||||
{
|
||||
*timeout = timeEnd - timeCurrent;
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FUNCTION_TEST_RETURN(result);
|
||||
}
|
||||
|
||||
bool
|
||||
sckReady(int fd, bool read, bool write, TimeMSec timeout)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(INT, fd);
|
||||
FUNCTION_LOG_PARAM(BOOL, read);
|
||||
FUNCTION_LOG_PARAM(BOOL, write);
|
||||
FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(fd >= 0);
|
||||
ASSERT(read || write);
|
||||
ASSERT(timeout < INT_MAX);
|
||||
|
||||
// Poll settings
|
||||
struct pollfd inputFd = {.fd = fd};
|
||||
|
||||
if (read)
|
||||
inputFd.events |= POLLIN;
|
||||
|
||||
if (write)
|
||||
inputFd.events |= POLLOUT;
|
||||
|
||||
// Wait for ready or timeout
|
||||
TimeMSec timeEnd = timeMSec() + timeout;
|
||||
bool first = true;
|
||||
|
||||
// Initialize result and errno to look like a retryable error. We have no good way to test this function with interrupts so this
|
||||
// at least ensures that the condition is retried.
|
||||
int result = -1;
|
||||
int errNo = EINTR;
|
||||
|
||||
while (sckReadyRetry(result, errNo, first, &timeout, timeEnd))
|
||||
{
|
||||
result = poll(&inputFd, 1, (int)timeout);
|
||||
|
||||
errNo = errno;
|
||||
first = false;
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result > 0);
|
||||
}
|
||||
|
||||
bool
|
||||
sckReadyRead(int fd, TimeMSec timeout)
|
||||
{
|
||||
return sckReady(fd, true, false, timeout);
|
||||
}
|
||||
|
||||
bool
|
||||
sckReadyWrite(int fd, TimeMSec timeout)
|
||||
{
|
||||
return sckReady(fd, false, true, timeout);
|
||||
}
|
||||
|
@ -21,9 +21,4 @@ void sckOptionSet(int fd);
|
||||
// Connect socket to an IP address
|
||||
void sckConnect(int fd, const String *host, unsigned int port, const struct addrinfo *hostAddress, TimeMSec timeout);
|
||||
|
||||
// Wait until the socket is ready to read/write or timeout
|
||||
bool sckReady(int fd, bool read, bool write, TimeMSec timeout);
|
||||
bool sckReadyRead(int fd, TimeMSec timeout);
|
||||
bool sckReadyWrite(int fd, TimeMSec timeout);
|
||||
|
||||
#endif
|
||||
|
@ -7,6 +7,8 @@ Socket Session
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/log.h"
|
||||
#include "common/io/fdRead.h"
|
||||
#include "common/io/fdWrite.h"
|
||||
#include "common/io/socket/client.h"
|
||||
#include "common/io/socket/common.h"
|
||||
#include "common/memContext.h"
|
||||
@ -24,11 +26,16 @@ struct SocketSession
|
||||
String *host; // Hostname or IP address
|
||||
unsigned int port; // Port to connect to host on
|
||||
TimeMSec timeout; // Timeout for any i/o operation (connect, read, etc.)
|
||||
|
||||
IoRead *read; // IoRead interface to the file descriptor
|
||||
IoWrite *write; // IoWrite interface to the file descriptor
|
||||
};
|
||||
|
||||
OBJECT_DEFINE_MOVE(SOCKET_SESSION);
|
||||
|
||||
OBJECT_DEFINE_GET(Fd, , SOCKET_SESSION, int, fd);
|
||||
OBJECT_DEFINE_GET(IoRead, , SOCKET_SESSION, IoRead *, read);
|
||||
OBJECT_DEFINE_GET(IoWrite, , SOCKET_SESSION, IoWrite *, write);
|
||||
OBJECT_DEFINE_GET(Type, const, SOCKET_SESSION, SocketSessionType, type);
|
||||
|
||||
OBJECT_DEFINE_FREE(SOCKET_SESSION);
|
||||
@ -63,6 +70,8 @@ sckSessionNew(SocketSessionType type, int fd, const String *host, unsigned int p
|
||||
{
|
||||
this = memNew(sizeof(SocketSession));
|
||||
|
||||
String *name = strNewFmt("%s:%u", strZ(host), port);
|
||||
|
||||
*this = (SocketSession)
|
||||
{
|
||||
.memContext = MEM_CONTEXT_NEW(),
|
||||
@ -71,53 +80,18 @@ sckSessionNew(SocketSessionType type, int fd, const String *host, unsigned int p
|
||||
.host = strDup(host),
|
||||
.port = port,
|
||||
.timeout = timeout,
|
||||
.read = ioFdReadNew(name, fd, timeout),
|
||||
.write = ioFdWriteNew(name, fd, timeout),
|
||||
};
|
||||
|
||||
memContextCallbackSet(this->memContext, sckSessionFreeResource, this);
|
||||
strFree(name);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(SOCKET_SESSION, this);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
void
|
||||
sckSessionReadyRead(SocketSession *this)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(SOCKET_SESSION, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
if (!sckReadyRead(this->fd, this->timeout))
|
||||
{
|
||||
THROW_FMT(
|
||||
ProtocolError, "timeout after %" PRIu64 "ms waiting for read from '%s:%u'", this->timeout, strZ(this->host),
|
||||
this->port);
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
void
|
||||
sckSessionReadyWrite(SocketSession *this)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(SOCKET_SESSION, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
if (!sckReadyWrite(this->fd, this->timeout))
|
||||
{
|
||||
THROW_FMT(
|
||||
ProtocolError, "timeout after %" PRIu64 "ms waiting for write to '%s:%u'", this->timeout, strZ(this->host), this->port);
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
String *
|
||||
sckSessionToLog(const SocketSession *this)
|
||||
|
@ -25,6 +25,8 @@ Object type
|
||||
|
||||
typedef struct SocketSession SocketSession;
|
||||
|
||||
#include "common/io/read.h"
|
||||
#include "common/io/write.h"
|
||||
#include "common/time.h"
|
||||
#include "common/type/string.h"
|
||||
|
||||
@ -33,16 +35,21 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
SocketSession *sckSessionNew(SocketSessionType type, int fd, const String *host, unsigned int port, TimeMSec timeout);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Read interface
|
||||
IoRead *sckSessionIoRead(SocketSession *this);
|
||||
|
||||
// Write interface
|
||||
IoWrite *sckSessionIoWrite(SocketSession *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Move to a new parent mem context
|
||||
SocketSession *sckSessionMove(SocketSession *this, MemContext *parentNew);
|
||||
|
||||
// Check if there is data ready to read/write on the socket
|
||||
void sckSessionReadyRead(SocketSession *this);
|
||||
void sckSessionReadyWrite(SocketSession *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -137,7 +137,7 @@ tlsSessionResultProcess(TlsSession *this, int errorTls, long unsigned int errorT
|
||||
// Try again after waiting for read ready
|
||||
case SSL_ERROR_WANT_READ:
|
||||
{
|
||||
sckSessionReadyRead(this->socketSession);
|
||||
ioReadReadyP(sckSessionIoRead(this->socketSession), .error = true);
|
||||
result = 0;
|
||||
break;
|
||||
}
|
||||
@ -145,7 +145,7 @@ tlsSessionResultProcess(TlsSession *this, int errorTls, long unsigned int errorT
|
||||
// Try again after waiting for write ready
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
{
|
||||
sckSessionReadyWrite(this->socketSession);
|
||||
ioWriteReadyP(sckSessionIoWrite(this->socketSession), .error = true);
|
||||
result = 0;
|
||||
break;
|
||||
}
|
||||
@ -221,7 +221,7 @@ tlsSessionRead(THIS_VOID, Buffer *buffer, bool block)
|
||||
{
|
||||
// If no TLS data pending then check the socket to reduce blocking
|
||||
if (!SSL_pending(this->session))
|
||||
sckSessionReadyRead(this->socketSession);
|
||||
ioReadReadyP(sckSessionIoRead(this->socketSession), .error = true);
|
||||
|
||||
// Read and handle errors. The error queue must be cleared before this operation.
|
||||
ERR_clear_error();
|
||||
|
@ -143,6 +143,25 @@ ioWriteLine(IoWrite *this, const Buffer *buffer)
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
bool
|
||||
ioWriteReady(IoWrite *this, IoWriteReadyParam param)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_WRITE, this);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.error);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
bool result = true;
|
||||
|
||||
if (this->interface.ready != NULL)
|
||||
result = this->interface.ready(this->driver, param.error);
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, result);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
void
|
||||
ioWriteStr(IoWrite *this, const String *string)
|
||||
|
@ -31,6 +31,18 @@ void ioWrite(IoWrite *this, const Buffer *buffer);
|
||||
// Write linefeed-terminated buffer
|
||||
void ioWriteLine(IoWrite *this, const Buffer *buffer);
|
||||
|
||||
// Can bytes be written immediately? There are no guarantees on how much data can be written but it must be at least one byte.
|
||||
typedef struct IoWriteReadyParam
|
||||
{
|
||||
VAR_PARAM_HEADER;
|
||||
bool error; // Error when write not ready
|
||||
} IoWriteReadyParam;
|
||||
|
||||
#define ioWriteReadyP(this, ...) \
|
||||
ioWriteReady(this, (IoWriteReadyParam){VAR_PARAM_INIT, __VA_ARGS__})
|
||||
|
||||
bool ioWriteReady(IoWrite *this, IoWriteReadyParam param);
|
||||
|
||||
// Write string
|
||||
void ioWriteStr(IoWrite *this, const String *string);
|
||||
|
||||
|
@ -14,6 +14,11 @@ typedef struct IoWriteInterface
|
||||
void (*close)(void *driver);
|
||||
int (*fd)(const void *driver);
|
||||
void (*open)(void *driver);
|
||||
|
||||
// Can bytes be written immediately? There are no guarantees on how much data can be written but it must be at least one byte.
|
||||
// Optionally error when write is not ready.
|
||||
bool (*ready)(void *driver, bool error);
|
||||
|
||||
void (*write)(void *driver, const Buffer *buffer);
|
||||
} IoWriteInterface;
|
||||
|
||||
|
@ -226,6 +226,7 @@ unit:
|
||||
coverage:
|
||||
common/io/bufferRead: full
|
||||
common/io/bufferWrite: full
|
||||
common/io/fd: full
|
||||
common/io/fdRead: full
|
||||
common/io/fdWrite: full
|
||||
common/io/filter/buffer: full
|
||||
@ -250,6 +251,10 @@ unit:
|
||||
common/io/socket/common: full
|
||||
common/io/socket/session: full
|
||||
|
||||
include:
|
||||
- common/io/fdRead
|
||||
- common/io/read
|
||||
|
||||
# ----------------------------------------------------------------------------------------------------------------------------
|
||||
- name: io-http
|
||||
total: 5
|
||||
|
@ -540,7 +540,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("child read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
lockAcquire(cfgOptionStr(cfgOptLockPath), cfgOptionStr(cfgOptStanza), cfgLockType(), 30000, true);
|
||||
@ -558,7 +558,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("parent read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Wait for the child to acquire the lock
|
||||
|
@ -152,7 +152,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("child read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
int lockFd = open(strZ(strNewFmt("%s/empty" LOCK_FILE_EXT, strZ(lockPath))), O_RDONLY, 0);
|
||||
@ -175,7 +175,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("parent read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Wait for the child to acquire the lock
|
||||
@ -209,7 +209,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("child read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
int lockFd = open(strZ(strNewFmt("%s/empty" LOCK_FILE_EXT, strZ(lockPath))), O_RDONLY, 0);
|
||||
@ -232,7 +232,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("parent read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Wait for the child to acquire the lock
|
||||
@ -261,7 +261,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("child read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
@ -280,7 +280,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("parent read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Wait for the child to acquire the lock
|
||||
@ -309,7 +309,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("child read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("child write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
int lockFd = open(strZ(strNewFmt("%s/badpid" LOCK_FILE_EXT, strZ(lockPath))), O_RDONLY, 0);
|
||||
@ -333,7 +333,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("parent read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("parent write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Wait for the child to acquire the lock
|
||||
|
@ -46,7 +46,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = protocolClientNew(strNew("test"), PROTOCOL_SERVICE_LOCAL_STR, read, write);
|
||||
|
@ -45,7 +45,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = protocolClientNew(strNew("test"), PROTOCOL_SERVICE_REMOTE_STR, read, write);
|
||||
@ -81,7 +81,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = NULL;
|
||||
@ -117,7 +117,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
TEST_ERROR(
|
||||
@ -149,7 +149,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = NULL;
|
||||
@ -187,7 +187,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
storagePutP(storageNewWriteP(storageData, strNew("lock/all" STOP_FILE_EXT)), NULL);
|
||||
|
@ -100,7 +100,7 @@ testRun(void)
|
||||
|
||||
TEST_ERROR(execFreeResource(exec), ExecuteError, "sleep did not exit when expected");
|
||||
|
||||
TEST_ERROR(ioReadLine(execIoRead(exec)), FileReadError, "unable to select from sleep read: [9] Bad file descriptor");
|
||||
TEST_ERROR(ioReadLine(execIoRead(exec)), FileReadError, "unable to read from sleep read: [9] Bad file descriptor");
|
||||
ioWriteStrLine(execIoWrite(exec), strNew(""));
|
||||
TEST_ERROR(ioWriteFlush(execIoWrite(exec)), FileWriteError, "unable to write to sleep write: [9] Bad file descriptor");
|
||||
|
||||
|
@ -199,7 +199,7 @@ testRun(void)
|
||||
|
||||
HARNESS_FORK_PARENT_BEGIN()
|
||||
{
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0)));
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000));
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("create client");
|
||||
|
@ -2,6 +2,7 @@
|
||||
Test IO
|
||||
***********************************************************************************************************************************/
|
||||
#include <fcntl.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include "common/type/json.h"
|
||||
|
||||
@ -278,6 +279,7 @@ testRun(void)
|
||||
"create io read object");
|
||||
|
||||
TEST_RESULT_BOOL(ioReadOpen(read), true, " open io object");
|
||||
TEST_RESULT_BOOL(ioReadReadyP(read), true, "read defaults to ready");
|
||||
TEST_RESULT_UINT(ioRead(read, buffer), 2, " read 2 bytes");
|
||||
TEST_RESULT_BOOL(ioReadEof(read), false, " no eof");
|
||||
TEST_RESULT_VOID(ioReadClose(read), " close io object");
|
||||
@ -483,6 +485,7 @@ testRun(void)
|
||||
"create io write object");
|
||||
|
||||
TEST_RESULT_VOID(ioWriteOpen(write), " open io object");
|
||||
TEST_RESULT_BOOL(ioWriteReadyP(write), true, "write defaults to ready");
|
||||
TEST_RESULT_BOOL(testIoWriteOpenCalled, true, " check io object open");
|
||||
TEST_RESULT_VOID(ioWriteStr(write, STRDEF("ABC")), " write 3 bytes");
|
||||
TEST_RESULT_VOID(ioWriteClose(write), " close io object");
|
||||
@ -538,8 +541,9 @@ testRun(void)
|
||||
{
|
||||
IoWrite *write = NULL;
|
||||
|
||||
TEST_ASSIGN(write, ioFdWriteNew(strNew("write test"), HARNESS_FORK_CHILD_WRITE()), "move write");
|
||||
TEST_ASSIGN(write, ioFdWriteNew(strNew("write test"), HARNESS_FORK_CHILD_WRITE(), 1000), "move write");
|
||||
ioWriteOpen(write);
|
||||
TEST_RESULT_BOOL(ioWriteReadyP(write), true, "write is ready");
|
||||
TEST_RESULT_INT(ioWriteFd(write), ((IoFdWrite *)write->driver)->fd, "check write fd");
|
||||
|
||||
// Write a line to be read
|
||||
@ -575,7 +579,11 @@ testRun(void)
|
||||
// Only part of the buffer is written before timeout
|
||||
Buffer *buffer = bufNew(16);
|
||||
|
||||
TEST_ERROR(ioRead(read, buffer), FileReadError, "unable to read data from read test after 1000ms");
|
||||
((IoFdRead *)read->driver)->timeout = 1;
|
||||
TEST_RESULT_BOOL(ioReadReadyP(read), false, "read is not ready (without throwing error)");
|
||||
((IoFdRead *)read->driver)->timeout = 1000;
|
||||
|
||||
TEST_ERROR(ioRead(read, buffer), FileReadError, "timeout after 1000ms waiting for read from 'read test'");
|
||||
TEST_RESULT_UINT(bufSize(buffer), 16, "buffer is only partially read");
|
||||
|
||||
// Read a buffer that is transmitted in two parts with blocking on the read side
|
||||
@ -605,6 +613,68 @@ testRun(void)
|
||||
int fd = open(strZ(fileName), O_CREAT | O_TRUNC | O_WRONLY, 0700);
|
||||
|
||||
TEST_RESULT_VOID(ioFdWriteOneStr(fd, strNew("test1\ntest2")), "write string to file");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("fdReadyRetry() edge conditions");
|
||||
|
||||
TimeMSec timeout = 5757;
|
||||
TEST_RESULT_BOOL(fdReadyRetry(-1, EINTR, true, &timeout, 0), true, "first retry does not modify timeout");
|
||||
TEST_RESULT_UINT(timeout, 5757, " check timeout");
|
||||
|
||||
timeout = 0;
|
||||
TEST_RESULT_BOOL(fdReadyRetry(-1, EINTR, false, &timeout, timeMSec() + 10000), true, "retry before timeout");
|
||||
TEST_RESULT_BOOL(timeout > 0, true, " check timeout");
|
||||
|
||||
TEST_RESULT_BOOL(fdReadyRetry(-1, EINTR, false, &timeout, timeMSec()), false, "no retry after timeout");
|
||||
TEST_ERROR(fdReadyRetry(-1, EINVAL, true, &timeout, 0), KernelError, "unable to poll socket: [22] Invalid argument");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("write is not ready on bad socket connection");
|
||||
|
||||
struct addrinfo hints = (struct addrinfo)
|
||||
{
|
||||
.ai_family = AF_UNSPEC,
|
||||
.ai_socktype = SOCK_STREAM,
|
||||
.ai_protocol = IPPROTO_TCP,
|
||||
};
|
||||
|
||||
int result;
|
||||
const char *hostBad = "172.31.255.255";
|
||||
struct addrinfo *hostBadAddress;
|
||||
|
||||
if ((result = getaddrinfo(hostBad, "7777", &hints, &hostBadAddress)) != 0)
|
||||
{
|
||||
THROW_FMT( // {uncoverable - lookup on IP should never fail}
|
||||
HostConnectError, "unable to get address for '%s': [%d] %s", hostBad, result, gai_strerror(result));
|
||||
}
|
||||
|
||||
TRY_BEGIN()
|
||||
{
|
||||
int fd = socket(hostBadAddress->ai_family, hostBadAddress->ai_socktype, hostBadAddress->ai_protocol);
|
||||
THROW_ON_SYS_ERROR(fd == -1, HostConnectError, "unable to create socket");
|
||||
|
||||
// Set socket non-blocking
|
||||
int flags;
|
||||
THROW_ON_SYS_ERROR((flags = fcntl(fd, F_GETFL)) == -1, ProtocolError, "unable to get flags");
|
||||
THROW_ON_SYS_ERROR(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1, ProtocolError, "unable to set O_NONBLOCK");
|
||||
|
||||
// Attempt connection
|
||||
CHECK(connect(fd, hostBadAddress->ai_addr, hostBadAddress->ai_addrlen) == -1);
|
||||
|
||||
// Create file descriptor write and wait for timeout
|
||||
IoWrite *write = NULL;
|
||||
TEST_ASSIGN(write, ioFdWriteNew(STR(hostBad), fd, 100), "new fd write");
|
||||
|
||||
TEST_RESULT_BOOL(ioWriteReadyP(write), false, "write is not ready");
|
||||
TEST_ERROR(
|
||||
ioWriteReadyP(write, .error = true), FileWriteError, "timeout after 100ms waiting for write to '172.31.255.255'");
|
||||
}
|
||||
FINALLY()
|
||||
{
|
||||
// This needs to be freed or valgrind will complain
|
||||
freeaddrinfo(hostBadAddress);
|
||||
}
|
||||
TRY_END();
|
||||
}
|
||||
|
||||
FUNCTION_HARNESS_RESULT_VOID();
|
||||
|
@ -144,7 +144,8 @@ testRun(void)
|
||||
TEST_ASSIGN(session, sckSessionNew(sckSessionTypeClient, fd, strNew(hostBad), 7777, 100), "new socket");
|
||||
|
||||
TEST_ERROR(
|
||||
sckSessionReadyWrite(session), ProtocolError, "timeout after 100ms waiting for write to '172.31.255.255:7777'");
|
||||
ioWriteReadyP(sckSessionIoWrite(session), .error = true), FileWriteError,
|
||||
"timeout after 100ms waiting for write to '172.31.255.255:7777'");
|
||||
|
||||
TEST_RESULT_VOID(sckSessionFree(session), "free socket session");
|
||||
|
||||
@ -264,7 +265,7 @@ testRun(void)
|
||||
|
||||
HARNESS_FORK_PARENT_BEGIN()
|
||||
{
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0)));
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 1000));
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("certificate error on invalid ca path");
|
||||
@ -379,7 +380,7 @@ testRun(void)
|
||||
|
||||
HARNESS_FORK_PARENT_BEGIN()
|
||||
{
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0)));
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("test client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 1000));
|
||||
ioBufferSizeSet(12);
|
||||
|
||||
TEST_ASSIGN(
|
||||
@ -392,25 +393,6 @@ testRun(void)
|
||||
TEST_ASSIGN(session, ioClientOpen(client), "open client");
|
||||
TlsSession *tlsSession = (TlsSession *)session->driver;
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("socket read/write ready");
|
||||
|
||||
TimeMSec timeout = 5757;
|
||||
TEST_RESULT_BOOL(sckReadyRetry(-1, EINTR, true, &timeout, 0), true, "first retry does not modify timeout");
|
||||
TEST_RESULT_UINT(timeout, 5757, " check timeout");
|
||||
|
||||
timeout = 0;
|
||||
TEST_RESULT_BOOL(sckReadyRetry(-1, EINTR, false, &timeout, timeMSec() + 10000), true, "retry before timeout");
|
||||
TEST_RESULT_BOOL(timeout > 0, true, " check timeout");
|
||||
|
||||
TEST_RESULT_BOOL(sckReadyRetry(-1, EINTR, false, &timeout, timeMSec()), false, "no retry after timeout");
|
||||
TEST_ERROR(
|
||||
sckReadyRetry(-1, EINVAL, true, &timeout, 0), KernelError, "unable to poll socket: [22] Invalid argument");
|
||||
|
||||
TEST_RESULT_BOOL(sckReadyRead(tlsSession->socketSession->fd, 0), false, "socket is not read ready");
|
||||
TEST_RESULT_BOOL(sckReadyWrite(tlsSession->socketSession->fd, 100), true, "socket is write ready");
|
||||
TEST_RESULT_VOID(sckSessionReadyWrite(tlsSession->socketSession), "socket session is write ready");
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("uncovered errors");
|
||||
|
||||
@ -459,11 +441,11 @@ testRun(void)
|
||||
hrnTlsServerSleep(500);
|
||||
|
||||
output = bufNew(12);
|
||||
tlsSession->socketSession->timeout = 100;
|
||||
((IoFdRead *)tlsSession->socketSession->read->driver)->timeout = 100;
|
||||
TEST_ERROR_FMT(
|
||||
ioRead(ioSessionIoRead(session), output), ProtocolError,
|
||||
ioRead(ioSessionIoRead(session), output), FileReadError,
|
||||
"timeout after 100ms waiting for read from '%s:%u'", strZ(hrnTlsServerHost()), hrnTlsServerPort());
|
||||
tlsSession->socketSession->timeout = 5000;
|
||||
((IoFdRead *)tlsSession->socketSession->read->driver)->timeout = 5000;
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("second protocol exchange");
|
||||
|
@ -26,7 +26,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("client read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
StringList *argList = strLstNew();
|
||||
@ -46,7 +46,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = protocolClientNew(strNew("test"), strNew("config"), read, write);
|
||||
|
@ -65,7 +65,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("client read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Set options
|
||||
@ -109,7 +109,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Create client
|
||||
|
@ -172,7 +172,7 @@ testRun(void)
|
||||
// Setup handler for remote storage protocol
|
||||
IoRead *read = ioFdReadNew(strNew("storage server read"), HARNESS_FORK_CHILD_READ(), 60000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("storage server write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("storage server write"), HARNESS_FORK_CHILD_WRITE(), 1000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolServer *server = protocolServerNew(strNew("storage test server"), strNew("test"), read, write);
|
||||
@ -187,7 +187,7 @@ testRun(void)
|
||||
// Create client
|
||||
IoRead *read = ioFdReadNew(strNew("storage client read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 60000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("storage client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("storage client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 1000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
ProtocolClient *client = protocolClientNew(strNew("storage test client"), strNew("test"), read, write);
|
||||
|
@ -418,7 +418,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Various bogus greetings
|
||||
@ -492,7 +492,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("client read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Various bogus greetings
|
||||
@ -598,7 +598,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("client read"), HARNESS_FORK_CHILD_READ(), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("client write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Check greeting
|
||||
@ -660,7 +660,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_PARENT_READ_PROCESS(0), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0));
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Send greeting
|
||||
@ -737,7 +737,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_CHILD_READ(), 10000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Greeting with noop
|
||||
@ -763,7 +763,7 @@ testRun(void)
|
||||
{
|
||||
IoRead *read = ioFdReadNew(strNew("server read"), HARNESS_FORK_CHILD_READ(), 10000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE());
|
||||
IoWrite *write = ioFdWriteNew(strNew("server write"), HARNESS_FORK_CHILD_WRITE(), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Greeting with noop
|
||||
@ -807,7 +807,7 @@ testRun(void)
|
||||
strNewFmt("client %u read", clientIdx), HARNESS_FORK_PARENT_READ_PROCESS(clientIdx), 2000);
|
||||
ioReadOpen(read);
|
||||
IoWrite *write = ioFdWriteNew(
|
||||
strNewFmt("client %u write", clientIdx), HARNESS_FORK_PARENT_WRITE_PROCESS(clientIdx));
|
||||
strNewFmt("client %u write", clientIdx), HARNESS_FORK_PARENT_WRITE_PROCESS(clientIdx), 2000);
|
||||
ioWriteOpen(write);
|
||||
|
||||
TEST_ASSIGN(
|
||||
|
@ -280,7 +280,7 @@ testRun(void)
|
||||
|
||||
HARNESS_FORK_PARENT_BEGIN()
|
||||
{
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("azure client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0)));
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("azure client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000));
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("test against local host");
|
||||
|
@ -380,7 +380,7 @@ testRun(void)
|
||||
|
||||
HARNESS_FORK_PARENT_BEGIN()
|
||||
{
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("s3 client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0)));
|
||||
hrnTlsClientBegin(ioFdWriteNew(strNew("s3 client write"), HARNESS_FORK_PARENT_WRITE_PROCESS(0), 2000));
|
||||
|
||||
Storage *s3 = storageS3New(
|
||||
path, true, NULL, bucket, endPoint, storageS3UriStyleHost, region, accessKey, secretAccessKey, NULL, 16, 2,
|
||||
|
Reference in New Issue
Block a user