You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-09-16 09:06:18 +02:00
Retry failed reads on object stores.
There is currently a retry if the initial get request fails (depending on the error code) but if the read fails later on while fetching blocks of data it is fatal. In most cases there is a higher level retry (e.g. restore) but restarting the restore job might be expensive depending on how many files are being restored.
Add a retry that will catch read errors and retry from where the last data was successfully read.
A bit of history -- this patch was first started three years ago but the memory context model at that time would not allow the interface (StorageRead) to own the driver (e.g. StorageReadS3). Subsequent improvements in memory contexts have allowed this ownership model and in fact it is now the default so no ownership changes are required in this patch except in StorageReadRemote which was not updated in f6e3073
.
This commit is contained in:
@@ -1,6 +1,17 @@
|
||||
<release date="XXXX-XX-XX" version="2.56.0dev" title="UNDER DEVELOPMENT">
|
||||
<release-core-list>
|
||||
<release-improvement-list>
|
||||
<release-item>
|
||||
<github-pull-request id="2642"/>
|
||||
|
||||
<release-item-contributor-list>
|
||||
<release-item-contributor id="david.steele"/>
|
||||
<release-item-reviewer id="david.christensen"/>
|
||||
</release-item-contributor-list>
|
||||
|
||||
<p>Retry failed reads on object stores.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<github-pull-request id="2625"/>
|
||||
|
||||
|
@@ -118,6 +118,27 @@ storageReadAzureEof(THIS_VOID)
|
||||
FUNCTION_TEST_RETURN(BOOL, ioReadEof(httpResponseIoRead(this->httpResponse)));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the file
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
storageReadAzureClose(THIS_VOID)
|
||||
{
|
||||
THIS(StorageReadAzure);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ_AZURE, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->httpResponse != NULL);
|
||||
|
||||
httpResponseFree(this->httpResponse);
|
||||
this->httpResponse = NULL;
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
FN_EXTERN StorageRead *
|
||||
storageReadAzureNew(
|
||||
@@ -150,6 +171,7 @@ storageReadAzureNew(
|
||||
.ignoreMissing = ignoreMissing,
|
||||
.offset = offset,
|
||||
.limit = varDup(limit),
|
||||
.retry = true,
|
||||
.version = version,
|
||||
.versionId = strDup(versionId),
|
||||
|
||||
@@ -158,6 +180,7 @@ storageReadAzureNew(
|
||||
.eof = storageReadAzureEof,
|
||||
.open = storageReadAzureOpen,
|
||||
.read = storageReadAzure,
|
||||
.close = storageReadAzureClose,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
@@ -121,6 +121,27 @@ storageReadGcsEof(THIS_VOID)
|
||||
FUNCTION_TEST_RETURN(BOOL, ioReadEof(httpResponseIoRead(this->httpResponse)));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the file
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
storageReadGcsClose(THIS_VOID)
|
||||
{
|
||||
THIS(StorageReadGcs);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ_GCS, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->httpResponse != NULL);
|
||||
|
||||
httpResponseFree(this->httpResponse);
|
||||
this->httpResponse = NULL;
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
FN_EXTERN StorageRead *
|
||||
storageReadGcsNew(
|
||||
@@ -153,6 +174,7 @@ storageReadGcsNew(
|
||||
.ignoreMissing = ignoreMissing,
|
||||
.offset = offset,
|
||||
.limit = varDup(limit),
|
||||
.retry = true,
|
||||
.version = version,
|
||||
.versionId = strDup(versionId),
|
||||
|
||||
@@ -161,6 +183,7 @@ storageReadGcsNew(
|
||||
.eof = storageReadGcsEof,
|
||||
.open = storageReadGcsOpen,
|
||||
.read = storageReadGcs,
|
||||
.close = storageReadGcsClose,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
@@ -6,6 +6,7 @@ Storage Read Interface
|
||||
#include "common/debug.h"
|
||||
#include "common/log.h"
|
||||
#include "common/memContext.h"
|
||||
#include "common/wait.h"
|
||||
#include "storage/read.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@@ -14,6 +15,8 @@ Object type
|
||||
struct StorageRead
|
||||
{
|
||||
StorageReadPub pub; // Publicly accessible variables
|
||||
void *driver; // Driver
|
||||
uint64_t bytesRead; // Bytes that have been successfully read
|
||||
};
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@@ -24,9 +27,173 @@ Macros for function logging
|
||||
#define FUNCTION_LOG_STORAGE_READ_INTERFACE_FORMAT(value, buffer, bufferSize) \
|
||||
objNameToLog(&value, "StorageReadInterface", buffer, bufferSize)
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Open the file
|
||||
***********************************************************************************************************************************/
|
||||
static bool
|
||||
storageReadOpen(THIS_VOID)
|
||||
{
|
||||
THIS(StorageRead);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, this->pub.interface->ioInterface.open(this->driver));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Read from a file and retry when there is a read failure
|
||||
***********************************************************************************************************************************/
|
||||
static size_t
|
||||
storageRead(THIS_VOID, Buffer *const buffer, const bool block)
|
||||
{
|
||||
THIS(StorageRead);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ, this);
|
||||
FUNCTION_LOG_PARAM(BUFFER, buffer);
|
||||
FUNCTION_LOG_PARAM(BOOL, block);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
const size_t bufUsedBegin = bufUsed(buffer);
|
||||
size_t result = 0;
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Use a specific number of tries here instead of a timeout because the underlying operations already have timeouts and
|
||||
// failures will generally happen after some delay, so it is not clear what timeout would be appropriate here.
|
||||
unsigned int try = this->pub.interface->retry ? 3 : 1;
|
||||
|
||||
// While tries remaining
|
||||
while (try > 0)
|
||||
{
|
||||
TRY_BEGIN()
|
||||
{
|
||||
this->pub.interface->ioInterface.read(this->driver, buffer, block);
|
||||
|
||||
// Account for bytes that have been read
|
||||
result += bufUsed(buffer) - bufUsedBegin;
|
||||
this->bytesRead += bufUsed(buffer) - bufUsedBegin;
|
||||
|
||||
// Set try to 1 to exit the loop
|
||||
try = 1;
|
||||
}
|
||||
CATCH_ANY()
|
||||
{
|
||||
// If there is another try remaining then close the file and reopen it to the new position, taking into account any
|
||||
// bytes that have already been read
|
||||
if (try > 1)
|
||||
{
|
||||
// Close the file
|
||||
this->pub.interface->ioInterface.close(this->driver);
|
||||
|
||||
// Ignore partial reads and restart from the last successful read
|
||||
bufUsedSet(buffer, bufUsedBegin);
|
||||
|
||||
// The file must not be missing on retry. If we got here then the file must have existed originally and it if is
|
||||
// missing now we want a hard error.
|
||||
this->pub.interface->ignoreMissing = false;
|
||||
|
||||
// Update offset and limit (when present) based on how many bytes have been successfully read
|
||||
this->pub.interface->offset = this->pub.offset + this->bytesRead;
|
||||
|
||||
if (this->pub.limit != NULL)
|
||||
{
|
||||
varFree(this->pub.interface->limit);
|
||||
|
||||
MEM_CONTEXT_OBJ_BEGIN(this->driver)
|
||||
{
|
||||
this->pub.interface->limit = varNewUInt64(varUInt64(this->pub.limit) - this->bytesRead);
|
||||
}
|
||||
MEM_CONTEXT_OBJ_END();
|
||||
}
|
||||
|
||||
// Open file with new offset/limit
|
||||
this->pub.interface->ioInterface.open(this->driver);
|
||||
}
|
||||
else
|
||||
RETHROW();
|
||||
}
|
||||
TRY_END();
|
||||
|
||||
try--;
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(SIZE, result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the file
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
storageReadClose(THIS_VOID)
|
||||
{
|
||||
THIS(StorageRead);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
this->pub.interface->ioInterface.close(this->driver);
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Has file reached EOF?
|
||||
***********************************************************************************************************************************/
|
||||
static bool
|
||||
storageReadEof(THIS_VOID)
|
||||
{
|
||||
THIS(StorageRead);
|
||||
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(STORAGE_READ, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(BOOL, this->pub.interface->ioInterface.eof(this->driver));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get file descriptor
|
||||
***********************************************************************************************************************************/
|
||||
static int
|
||||
storageReadFd(const THIS_VOID)
|
||||
{
|
||||
THIS(const StorageRead);
|
||||
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(STORAGE_READ, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(INT, this->pub.interface->ioInterface.fd(this->driver));
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
static const IoReadInterface storageIoReadInterface =
|
||||
{
|
||||
.open = storageReadOpen,
|
||||
.read = storageRead,
|
||||
.close = storageReadClose,
|
||||
.eof = storageReadEof,
|
||||
.fd = storageReadFd,
|
||||
};
|
||||
|
||||
FN_EXTERN StorageRead *
|
||||
storageReadNew(void *const driver, const StorageReadInterface *const interface)
|
||||
storageReadNew(void *const driver, StorageReadInterface *const interface)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM_P(VOID, driver);
|
||||
@@ -37,15 +204,29 @@ storageReadNew(void *const driver, const StorageReadInterface *const interface)
|
||||
|
||||
ASSERT(driver != NULL);
|
||||
ASSERT(interface != NULL);
|
||||
ASSERT(interface->ioInterface.eof != NULL);
|
||||
ASSERT(interface->ioInterface.close != NULL);
|
||||
ASSERT(interface->ioInterface.open != NULL);
|
||||
ASSERT(interface->ioInterface.read != NULL);
|
||||
|
||||
// Remove fd method if it does not exist in the driver
|
||||
IoReadInterface storageIoReadInterfaceCopy = storageIoReadInterface;
|
||||
|
||||
if (interface->ioInterface.fd == NULL)
|
||||
storageIoReadInterfaceCopy.fd = NULL;
|
||||
|
||||
OBJ_NEW_BEGIN(StorageRead, .childQty = MEM_CONTEXT_QTY_MAX)
|
||||
{
|
||||
*this = (StorageRead)
|
||||
{
|
||||
.driver = objMove(driver, memContextCurrent()),
|
||||
.pub =
|
||||
{
|
||||
.interface = interface,
|
||||
.io = ioReadNew(driver, interface->ioInterface),
|
||||
.io = ioReadNew(this, storageIoReadInterfaceCopy),
|
||||
.offset = interface->offset,
|
||||
.limit = varDup(interface->limit),
|
||||
.ignoreMissing = interface->ignoreMissing,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
@@ -44,7 +44,7 @@ storageReadIo(const StorageRead *const this)
|
||||
FN_INLINE_ALWAYS const Variant *
|
||||
storageReadLimit(const StorageRead *const this)
|
||||
{
|
||||
return storageReadInterface(this)->limit;
|
||||
return THIS_PUB(StorageRead)->limit;
|
||||
}
|
||||
|
||||
// File name
|
||||
@@ -58,7 +58,7 @@ storageReadName(const StorageRead *const this)
|
||||
FN_INLINE_ALWAYS uint64_t
|
||||
storageReadOffset(const StorageRead *const this)
|
||||
{
|
||||
return storageReadInterface(this)->offset;
|
||||
return THIS_PUB(StorageRead)->offset;
|
||||
}
|
||||
|
||||
// Get file type
|
||||
|
@@ -17,21 +17,25 @@ typedef struct StorageReadInterface
|
||||
unsigned int compressLevel; // Level to use for compression
|
||||
bool ignoreMissing;
|
||||
uint64_t offset; // Where to start reading in the file
|
||||
const Variant *limit; // Limit how many bytes are read (NULL for no limit)
|
||||
Variant *limit; // Limit how many bytes are read (NULL for no limit)
|
||||
bool retry; // Are read retries allowed?
|
||||
bool version; // Read version
|
||||
const String *versionId; // File version to read
|
||||
IoReadInterface ioInterface;
|
||||
} StorageReadInterface;
|
||||
|
||||
FN_EXTERN StorageRead *storageReadNew(void *driver, const StorageReadInterface *interface);
|
||||
FN_EXTERN StorageRead *storageReadNew(void *driver, StorageReadInterface *interface);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct StorageReadPub
|
||||
{
|
||||
const StorageReadInterface *interface; // File data (name, driver type, etc.)
|
||||
StorageReadInterface *interface; // File data (name, driver type, etc.)
|
||||
IoRead *io; // Read interface
|
||||
uint64_t offset; // Where to start reading in the file
|
||||
const Variant *limit; // Limit how many bytes are read (NULL for no limit)
|
||||
bool ignoreMissing; // Ignore missing file?
|
||||
} StorageReadPub;
|
||||
|
||||
// Read interface
|
||||
|
@@ -309,11 +309,11 @@ storageReadRemoteNew(
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
this->read = storageReadNew(OBJ_NAME(this, StorageRead::StorageReadRemote), &this->interface);
|
||||
}
|
||||
OBJ_NEW_END();
|
||||
|
||||
this->read = storageReadNew(OBJ_NAME(this, StorageRead::StorageReadRemote), &this->interface);
|
||||
|
||||
ASSERT(this != NULL);
|
||||
FUNCTION_LOG_RETURN(STORAGE_READ, this->read);
|
||||
}
|
||||
|
@@ -118,6 +118,27 @@ storageReadS3Eof(THIS_VOID)
|
||||
FUNCTION_TEST_RETURN(BOOL, ioReadEof(httpResponseIoRead(this->httpResponse)));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the file
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
storageReadS3Close(THIS_VOID)
|
||||
{
|
||||
THIS(StorageReadS3);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_READ_S3, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->httpResponse != NULL);
|
||||
|
||||
httpResponseFree(this->httpResponse);
|
||||
this->httpResponse = NULL;
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
FN_EXTERN StorageRead *
|
||||
storageReadS3New(
|
||||
@@ -151,6 +172,7 @@ storageReadS3New(
|
||||
.ignoreMissing = ignoreMissing,
|
||||
.offset = offset,
|
||||
.limit = varDup(limit),
|
||||
.retry = true,
|
||||
.version = version,
|
||||
.versionId = strDup(versionId),
|
||||
|
||||
@@ -159,6 +181,7 @@ storageReadS3New(
|
||||
.eof = storageReadS3Eof,
|
||||
.open = storageReadS3Open,
|
||||
.read = storageReadS3,
|
||||
.close = storageReadS3Close,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
@@ -619,6 +619,7 @@ unit:
|
||||
- storage/s3/read
|
||||
- storage/s3/storage
|
||||
- storage/s3/write
|
||||
- storage/read
|
||||
- storage/helper
|
||||
|
||||
include:
|
||||
|
@@ -122,6 +122,7 @@ typedef struct TestResponseParam
|
||||
unsigned int code;
|
||||
const char *header;
|
||||
const char *content;
|
||||
const Variant *contentSize;
|
||||
} TestResponseParam;
|
||||
|
||||
#define testResponseP(write, ...) \
|
||||
@@ -163,7 +164,7 @@ testResponse(IoWrite *write, TestResponseParam param)
|
||||
"content-length:%zu\r\n"
|
||||
"\r\n"
|
||||
"%s",
|
||||
strlen(param.content), param.content);
|
||||
param.contentSize != NULL ? varUInt(param.contentSize) : strlen(param.content), param.content);
|
||||
}
|
||||
else
|
||||
strCatZ(response, "\r\n");
|
||||
@@ -526,6 +527,51 @@ testRun(void)
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
|
||||
"this is a sample file", "get file");
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry");
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, "/file.txt");
|
||||
testResponseP(service, .content = "12345678911234567892", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, "/file.txt", .range = "20-");
|
||||
testResponseP(service, .content = "1234567893");
|
||||
|
||||
const size_t ioBufferSizeDefault = ioBufferSize();
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt")))), "123456789112345678921234567893",
|
||||
"get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry, offset, and limit");
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, "/file.txt", .range = "1-29");
|
||||
testResponseP(service, .content = "23456789112345678921X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptAbort(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, "/file.txt", .range = "21-29");
|
||||
testResponseP(service, .content = "23456789");
|
||||
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(29)))),
|
||||
"2345678911234567892123456789", "get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// Close to reset buffer size
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get zero-length file");
|
||||
|
||||
|
@@ -153,6 +153,7 @@ typedef struct TestResponseParam
|
||||
bool multiPart;
|
||||
const char *header;
|
||||
const char *content;
|
||||
const Variant *contentSize;
|
||||
} TestResponseParam;
|
||||
|
||||
#define testResponseP(write, ...) \
|
||||
@@ -198,7 +199,7 @@ testResponse(IoWrite *write, TestResponseParam param)
|
||||
"content-length:%zu\r\n"
|
||||
"\r\n"
|
||||
"%s",
|
||||
strlen(param.content), param.content);
|
||||
param.contentSize != NULL ? varUInt(param.contentSize) : strlen(param.content), param.content);
|
||||
}
|
||||
else
|
||||
strCatZ(response, "\r\n");
|
||||
@@ -445,6 +446,51 @@ testRun(void)
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
|
||||
"this is a sample file", "get file");
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry");
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media");
|
||||
testResponseP(service, .content = "12345678911234567892", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media", .range = "20-");
|
||||
testResponseP(service, .content = "1234567893");
|
||||
|
||||
const size_t ioBufferSizeDefault = ioBufferSize();
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt")))), "123456789112345678921234567893",
|
||||
"get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry, offset, and limit");
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media", .range = "1-29");
|
||||
testResponseP(service, .content = "23456789112345678921X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptAbort(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media", .range = "21-29");
|
||||
testResponseP(service, .content = "23456789");
|
||||
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(29)))),
|
||||
"2345678911234567892123456789", "get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// Close to reset buffer size
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("switch to auto auth");
|
||||
|
||||
|
@@ -947,8 +947,7 @@ testRun(void)
|
||||
HRN_SYSTEM_FMT("touch %s", strZ(fileName));
|
||||
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_RESULT_INT(
|
||||
ioReadFd(storageReadIo(file)), ((StorageReadPosix *)ioReadDriver(storageReadIo(file)))->fd, "check read fd");
|
||||
TEST_RESULT_INT(ioReadFd(storageReadIo(file)), ((StorageReadPosix *)file->driver)->fd, "check read fd");
|
||||
TEST_RESULT_VOID(ioReadClose(storageReadIo(file)), "close file");
|
||||
}
|
||||
|
||||
@@ -1212,13 +1211,13 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
|
||||
// Close the file descriptor so operations will fail
|
||||
close(((StorageReadPosix *)ioReadDriver(storageReadIo(file)))->fd);
|
||||
close(((StorageReadPosix *)file->driver)->fd);
|
||||
|
||||
TEST_ERROR_FMT(
|
||||
ioRead(storageReadIo(file), outBuffer), FileReadError, "unable to read '%s': [9] Bad file descriptor", strZ(fileName));
|
||||
|
||||
// Set file descriptor to -1 so the close on free will not fail
|
||||
((StorageReadPosix *)ioReadDriver(storageReadIo(file)))->fd = -1;
|
||||
((StorageReadPosix *)file->driver)->fd = -1;
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("incremental load");
|
||||
@@ -1261,8 +1260,7 @@ testRun(void)
|
||||
TEST_RESULT_VOID(ioRead(storageReadIo(file), outBuffer), "no data to load");
|
||||
TEST_RESULT_UINT(bufUsed(outBuffer), 0, "buffer is empty");
|
||||
|
||||
TEST_RESULT_VOID(
|
||||
storageReadPosix(ioReadDriver(storageReadIo(file)), outBuffer, true), "no data to load from driver either");
|
||||
TEST_RESULT_VOID(storageReadPosix(file->driver, outBuffer, true), "no data to load from driver either");
|
||||
TEST_RESULT_UINT(bufUsed(outBuffer), 0, "buffer is empty");
|
||||
|
||||
TEST_RESULT_BOOL(bufEq(buffer, expectedBuffer), true, "check file contents (all loaded)");
|
||||
|
@@ -281,9 +281,7 @@ testRun(void)
|
||||
StorageRead *fileReadRaw;
|
||||
TEST_ASSIGN(fileReadRaw, storageNewReadP(storageRepo, STRDEF("test.txt")), "new file");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(fileReadRaw)), true, "open read");
|
||||
TEST_ASSIGN(
|
||||
size, storageReadRemote(ioReadDriver(storageReadIo(fileReadRaw)), buffer, true),
|
||||
"read file and save returned size");
|
||||
TEST_ASSIGN(size, storageReadRemote(fileReadRaw->driver, buffer, true), "read file and save returned size");
|
||||
TEST_RESULT_UINT(size, bufUsed(buffer), "check returned size");
|
||||
TEST_RESULT_UINT(size, bufUsed(contentBuf), "returned size should be the same as the file size");
|
||||
TEST_RESULT_VOID(ioReadClose(storageReadIo(fileReadRaw)), "close");
|
||||
@@ -301,8 +299,8 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(bufEq(storageGetP(fileRead), contentBuf), true, "get file");
|
||||
TEST_RESULT_BOOL(storageReadIgnoreMissing(fileRead), false, "check ignore missing");
|
||||
TEST_RESULT_STR_Z(storageReadName(fileRead), TEST_PATH "/repo128/test.txt", "check name");
|
||||
TEST_RESULT_UINT(storageReadRemote(ioReadDriver(storageReadIo(fileRead)), bufNew(32), false), 0, "nothing more to read");
|
||||
TEST_RESULT_UINT(((StorageReadRemote *)ioReadDriver(storageReadIo(fileRead)))->protocolReadBytes, bufSize(contentBuf), "check read size");
|
||||
TEST_RESULT_UINT(storageReadRemote(fileRead->driver, bufNew(32), false), 0, "nothing more to read");
|
||||
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), "check read size");
|
||||
|
||||
// Enable protocol compression in the storage object
|
||||
((StorageRemote *)storageDriver(storageRepo))->compressLevel = 3;
|
||||
@@ -312,7 +310,7 @@ testRun(void)
|
||||
|
||||
TEST_ASSIGN(fileRead, storageNewReadP(storageRepo, STRDEF("test.txt"), .limit = VARUINT64(11)), "get file");
|
||||
TEST_RESULT_STR_Z(strNewBuf(storageGetP(fileRead)), "BABABABABAB", "check contents");
|
||||
TEST_RESULT_UINT(((StorageReadRemote *)ioReadDriver(storageReadIo(fileRead)))->protocolReadBytes, 11, "check read size");
|
||||
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, 11, "check read size");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("read partial file then close");
|
||||
@@ -364,8 +362,7 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(bufEq(storageGetP(fileRead), contentBuf), true, "check contents");
|
||||
// We don't know how much protocol compression there will be exactly, but make sure this is some
|
||||
TEST_RESULT_BOOL(
|
||||
((StorageReadRemote *)ioReadDriver(storageReadIo(fileRead)))->protocolReadBytes < bufSize(contentBuf), true,
|
||||
"check compressed read size");
|
||||
((StorageReadRemote *)fileRead->driver)->protocolReadBytes < bufSize(contentBuf), true, "check compressed read size");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("file missing");
|
||||
|
@@ -195,6 +195,7 @@ typedef struct TestResponseParam
|
||||
const char *http;
|
||||
const char *header;
|
||||
const char *content;
|
||||
const Variant *contentSize;
|
||||
} TestResponseParam;
|
||||
|
||||
#define testResponseP(write, ...) \
|
||||
@@ -232,7 +233,7 @@ testResponse(IoWrite *write, TestResponseParam param)
|
||||
"content-length:%zu\r\n"
|
||||
"\r\n"
|
||||
"%s",
|
||||
strlen(param.content), param.content);
|
||||
param.contentSize != NULL ? varUInt(param.contentSize) : strlen(param.content), param.content);
|
||||
}
|
||||
else
|
||||
strCatZ(response, "\r\n");
|
||||
@@ -429,7 +430,7 @@ testRun(void)
|
||||
}
|
||||
HRN_FORK_CHILD_END();
|
||||
|
||||
HRN_FORK_CHILD_BEGIN(.prefix = "auth server", .timeout = 10000)
|
||||
HRN_FORK_CHILD_BEGIN(.prefix = "auth server", .timeout = 15000)
|
||||
{
|
||||
TEST_RESULT_VOID(hrnServerRunP(HRN_FORK_CHILD_READ(), hrnServerProtocolSocket, testPortAuth), "auth server");
|
||||
}
|
||||
@@ -492,6 +493,86 @@ testRun(void)
|
||||
strNewBuf(storageGetP(storageNewReadP(s3, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
|
||||
"this is a sample file", "get file");
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry");
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt");
|
||||
testResponseP(service, .content = "12345678911234567892", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "20-");
|
||||
testResponseP(service, .content = "1234567893");
|
||||
|
||||
const size_t ioBufferSizeDefault = ioBufferSize();
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(s3, STRDEF("file.txt")))),
|
||||
"123456789112345678921234567893", "get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file with retry, offset, and limit");
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "1-29");
|
||||
testResponseP(service, .content = "23456789112345678921X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptAbort(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "21-29");
|
||||
testResponseP(service, .content = "23456789");
|
||||
|
||||
ioBufferSizeSet(20);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
strNewBuf(storageGetP(storageNewReadP(s3, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(29)))),
|
||||
"2345678911234567892123456789", "get file");
|
||||
|
||||
ioBufferSizeSet(ioBufferSizeDefault);
|
||||
|
||||
// Close to reset buffer size
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get file all retries fail");
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "1-29");
|
||||
testResponseP(service, .content = "X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "1-29");
|
||||
testResponseP(service, .content = "X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "1-29");
|
||||
testResponseP(service, .content = "X", .contentSize = VARUINT(30));
|
||||
|
||||
hrnServerScriptClose(service);
|
||||
hrnServerScriptAccept(service);
|
||||
|
||||
bool errorCaught = false;
|
||||
|
||||
TRY_BEGIN()
|
||||
{
|
||||
storageGetP(storageNewReadP(s3, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(29)));
|
||||
}
|
||||
CATCH_ANY()
|
||||
{
|
||||
errorCaught = true;
|
||||
}
|
||||
TRY_END();
|
||||
|
||||
TEST_RESULT_BOOL(errorCaught, true, "check error was caught");
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get zero-length file");
|
||||
|
||||
|
@@ -4685,7 +4685,7 @@ testRun(void)
|
||||
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_RESULT_VOID(storageReadSftpClose((StorageReadSftp *)ioReadDriver(storageReadIo(file))), "close file");
|
||||
TEST_RESULT_VOID(storageReadSftpClose((StorageReadSftp *)file->driver), "close file");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
|
||||
@@ -4713,7 +4713,7 @@ testRun(void)
|
||||
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
close(ioSessionFd(((StorageReadSftp *)ioReadDriver(storageReadIo(file)))->storage->ioSession));
|
||||
close(ioSessionFd(((StorageReadSftp *)file->driver)->storage->ioSession));
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
|
||||
@@ -4741,8 +4741,8 @@ testRun(void)
|
||||
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
((StorageReadSftp *)ioReadDriver(storageReadIo(file)))->sftpHandle = NULL;
|
||||
TEST_RESULT_VOID(storageReadSftpClose(ioReadDriver(storageReadIo(file))), "close file null sftpHandle");
|
||||
((StorageReadSftp *)file->driver)->sftpHandle = NULL;
|
||||
TEST_RESULT_VOID(storageReadSftpClose(file->driver), "close file null sftpHandle");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
|
||||
@@ -4775,7 +4775,7 @@ testRun(void)
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_ERROR(
|
||||
storageReadSftpClose(ioReadDriver(storageReadIo(file))), FileCloseError,
|
||||
storageReadSftpClose(file->driver), FileCloseError,
|
||||
"timeout closing file '" TEST_PATH "/readtest.txt': libssh2 error [-37]");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
@@ -4809,7 +4809,7 @@ testRun(void)
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_ERROR(
|
||||
storageReadSftpClose(ioReadDriver(storageReadIo(file))), FileCloseError,
|
||||
storageReadSftpClose(file->driver), FileCloseError,
|
||||
"unable to close file '" TEST_PATH "/readtest.txt' after read: libssh2 errno [-31]: sftp errno [4]");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
@@ -4842,7 +4842,7 @@ testRun(void)
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_ERROR(
|
||||
storageReadSftpClose(ioReadDriver(storageReadIo(file))), FileCloseError,
|
||||
storageReadSftpClose(file->driver), FileCloseError,
|
||||
"unable to close file '" TEST_PATH "/readtest.txt' after read: libssh2 errno [-29]");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
@@ -4874,7 +4874,7 @@ testRun(void)
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_ERROR(
|
||||
storageReadSftp(ioReadDriver(storageReadIo(file)), outBuffer, false), FileReadError,
|
||||
storageReadSftp(file->driver, outBuffer, false), FileReadError,
|
||||
"unable to read '" TEST_PATH "/readtest.txt': sftp errno [4]");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
@@ -4906,7 +4906,7 @@ testRun(void)
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName), "new read file (defaults)");
|
||||
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
|
||||
TEST_ERROR(
|
||||
storageReadSftp(ioReadDriver(storageReadIo(file)), outBuffer, false), FileReadError,
|
||||
storageReadSftp(file->driver, outBuffer, false), FileReadError,
|
||||
"unable to read '" TEST_PATH "/readtest.txt': libssh2 error [-29]");
|
||||
|
||||
memContextFree(objMemContext((StorageSftp *)storageDriver(storageTest)));
|
||||
@@ -6761,14 +6761,14 @@ testRun(void)
|
||||
TEST_RESULT_VOID(ioRead(storageReadIo(file), outBuffer), "no data to load");
|
||||
TEST_RESULT_UINT(bufUsed(outBuffer), 0, "buffer is empty");
|
||||
|
||||
TEST_RESULT_VOID(storageReadSftp(ioReadDriver(storageReadIo(file)), outBuffer, true), "no data to load from driver either");
|
||||
TEST_RESULT_VOID(storageReadSftp(file->driver, outBuffer, true), "no data to load from driver either");
|
||||
TEST_RESULT_UINT(bufUsed(outBuffer), 0, "buffer is empty");
|
||||
|
||||
TEST_RESULT_BOOL(bufEq(buffer, expectedBuffer), true, "check file contents (all loaded)");
|
||||
|
||||
TEST_RESULT_BOOL(ioReadEof(storageReadIo(file)), true, "eof");
|
||||
TEST_RESULT_BOOL(ioReadEof(storageReadIo(file)), true, "still eof");
|
||||
TEST_RESULT_BOOL(storageReadSftpEof(ioReadDriver(storageReadIo(file))), true, "storageReadSftpEof eof true");
|
||||
TEST_RESULT_BOOL(storageReadSftpEof(file->driver), true, "storageReadSftpEof eof true");
|
||||
|
||||
TEST_RESULT_VOID(ioReadClose(storageReadIo(file)), "close file");
|
||||
|
||||
|
Reference in New Issue
Block a user