1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-12 10:04:14 +02:00

Allow storage reads to be limited by bytes.

The current use case is reading files from the PostgreSQL cluster during backup.

A file may grow during backup but we only need to copy the number of bytes that were reported during the manifest build.  The rest will be rebuilt from the WAL during recovery so copying more is just a waste of space.

Limiting the copy sizes in backup will be part of a future commit.
This commit is contained in:
David Steele 2020-03-17 18:16:17 -04:00
parent 307e741298
commit f2548f45ce
15 changed files with 86 additions and 17 deletions

View File

@ -28,6 +28,8 @@ typedef struct StorageReadPosix
StoragePosix *storage; // Storage that created this object
int handle;
uint64_t current; // Current bytes read from file
uint64_t limit; // Limit bytes to be read from file
bool eof;
} StorageReadPosix;
@ -112,8 +114,13 @@ storageReadPosix(THIS_VOID, Buffer *buffer, bool block)
if (!this->eof)
{
// Read and handle errors
// Determine expected bytes to read. If remaining size in the buffer would exceed the limit then reduce the expected read.
size_t expectedBytes = bufRemains(buffer);
if (this->current + expectedBytes > this->limit)
expectedBytes = (size_t)(this->limit - this->current);
// Read from file
actualBytes = read(this->handle, bufRemainsPtr(buffer), expectedBytes);
// Error occurred during read
@ -122,10 +129,11 @@ storageReadPosix(THIS_VOID, Buffer *buffer, bool block)
// Update amount of buffer used
bufUsedInc(buffer, (size_t)actualBytes);
this->current += (uint64_t)actualBytes;
// If less data than expected was read then EOF. The file may not actually be EOF but we are not concerned with files that
// are growing. Just read up to the point where the file is being extended.
if ((size_t)actualBytes != expectedBytes)
// If less data than expected was read or the limit has been reached then EOF. The file may not actually be EOF but we are
// not concerned with files that are growing. Just read up to the point where the file is being extended.
if ((size_t)actualBytes != expectedBytes || this->current == this->limit)
this->eof = true;
}
@ -191,11 +199,12 @@ storageReadPosixHandle(const THIS_VOID)
New object
***********************************************************************************************************************************/
StorageRead *
storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing)
storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
ASSERT(name != NULL);
@ -212,11 +221,15 @@ storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissin
.storage = storage,
.handle = -1,
// Rather than enable/disable limit checking just always use a limit
.limit = limit == NULL ? UINT64_MAX : varUInt64(limit),
.interface = (StorageReadInterface)
{
.type = STORAGE_POSIX_TYPE_STR,
.name = strDup(name),
.ignoreMissing = ignoreMissing,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)
{

View File

@ -10,6 +10,6 @@ Posix Storage Read
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
StorageRead *storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing);
StorageRead *storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit);
#endif

View File

@ -389,13 +389,13 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageIn
FUNCTION_LOG_PARAM(STORAGE_POSIX, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)param; // No parameters are used
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(file != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing));
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing, param.limit));
}
/**********************************************************************************************************************************/
@ -635,7 +635,7 @@ New object
***********************************************************************************************************************************/
static const StorageInterface storageInterfacePosix =
{
.feature = (1 << storageFeaturePath | 1 << storageFeatureCompress),
.feature = 1 << storageFeaturePath | 1 << storageFeatureCompress | 1 << storageFeatureLimitRead,
.exists = storagePosixExists,
.info = storagePosixInfo,

View File

@ -90,6 +90,19 @@ storageReadIgnoreMissing(const StorageRead *this)
FUNCTION_TEST_RETURN(this->interface->ignoreMissing);
}
/**********************************************************************************************************************************/
const Variant *
storageReadLimit(const StorageRead *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(STORAGE_READ, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->interface->limit);
}
/***********************************************************************************************************************************
Get io interface
***********************************************************************************************************************************/

View File

@ -23,6 +23,10 @@ StorageRead *storageReadMove(StorageRead *this, MemContext *parentNew);
Getters
***********************************************************************************************************************************/
IoRead *storageReadIo(const StorageRead *this);
// Is there a read limit?
const Variant *storageReadLimit(const StorageRead *this);
bool storageReadIgnoreMissing(const StorageRead *this);
const String *storageReadName(const StorageRead *this);
const String *storageReadType(const StorageRead *this);

View File

@ -17,6 +17,7 @@ typedef struct StorageReadInterface
bool compressible; // Is this file compressible?
unsigned int compressLevel; // Level to use for compression
bool ignoreMissing;
const Variant *limit; // Limit how many bytes are read
IoReadInterface ioInterface;
} StorageReadInterface;

View File

@ -246,10 +246,11 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
{
// Create the read object
IoRead *fileRead = storageReadIo(
storageInterfaceNewReadP(driver, varStr(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1))));
storageInterfaceNewReadP(
driver, varStr(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1)), .limit = varLstGet(paramList, 2)));
// Set filter group based on passed filters
storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 2));
storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 3));
// Check if the file exists
bool exists = ioReadOpen(fileRead);

View File

@ -72,6 +72,7 @@ storageReadRemoteOpen(THIS_VOID)
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR);
protocolCommandParamAdd(command, VARSTR(this->interface.name));
protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing));
protocolCommandParamAdd(command, this->interface.limit);
protocolCommandParamAdd(command, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read))));
result = varBool(protocolClientExecute(this->client, command, true));
@ -178,7 +179,7 @@ New object
StorageRead *
storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel)
unsigned int compressLevel, const Variant *limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
@ -187,6 +188,7 @@ storageReadRemoteNew(
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_PARAM(UINT, compressLevel);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
@ -212,6 +214,7 @@ storageReadRemoteNew(
.compressible = compressible,
.compressLevel = compressLevel,
.ignoreMissing = ignoreMissing,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)
{

View File

@ -13,6 +13,6 @@ Constructor
***********************************************************************************************************************************/
StorageRead *storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel);
unsigned int compressLevel, const Variant *limit);
#endif

View File

@ -251,6 +251,7 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -259,7 +260,8 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI
FUNCTION_LOG_RETURN(
STORAGE_READ,
storageReadRemoteNew(
this, this->client, file, ignoreMissing, this->compressLevel > 0 ? param.compressible : false, this->compressLevel));
this, this->client, file, ignoreMissing, this->compressLevel > 0 ? param.compressible : false, this->compressLevel,
param.limit));
}
/**********************************************************************************************************************************/

View File

@ -597,9 +597,12 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
FUNCTION_LOG_PARAM(STRING, fileExp);
FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(storageFeature(this, storageFeatureLimitRead) || param.limit == NULL);
ASSERT(param.limit == NULL || varType(param.limit) == varTypeUInt64);
StorageRead *result = NULL;
@ -607,7 +610,8 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
{
result = storageReadMove(
storageInterfaceNewReadP(
this->driver, storagePathP(this, fileExp), param.ignoreMissing, .compressible = param.compressible),
this->driver, storagePathP(this, fileExp), param.ignoreMissing, .compressible = param.compressible,
.limit = param.limit),
memContextPrior());
}
MEM_CONTEXT_TEMP_END();

View File

@ -45,6 +45,9 @@ typedef enum
// Does the storage support hardlinks? Hardlinks allow the same file to be linked into multiple paths to save space.
storageFeatureHardLink,
// Can the storage limit the amount of data read from a file?
storageFeatureLimitRead,
// Does the storage support symlinks? Symlinks allow paths/files/links to be accessed from another path.
storageFeatureSymLink,
} StorageFeature;
@ -153,6 +156,7 @@ typedef struct StorageNewReadParam
VAR_PARAM_HEADER;
bool ignoreMissing;
bool compressible;
const Variant *limit; // Limit bytes to read from the file (must be varTypeUInt64)
} StorageNewReadParam;
#define storageNewReadP(this, pathExp, ...) \

View File

@ -104,6 +104,9 @@ typedef struct StorageInterfaceNewReadParam
// Is the file compressible? This is useful when the file must be moved across a network and some temporary compression is
// helpful.
bool compressible;
// Limit bytes read from the file
const Variant *limit;
} StorageInterfaceNewReadParam;
typedef StorageRead *StorageInterfaceNewRead(

View File

@ -811,6 +811,16 @@ testRun(void)
TEST_ASSIGN(buffer, storageGetP(storageNewReadP(storageTest, strNewFmt("%s/test.txt", testPath()))), "get text");
TEST_RESULT_INT(bufSize(buffer), 9, "check size");
TEST_RESULT_BOOL(memcmp(bufPtr(buffer), "TESTFILE\n", bufSize(buffer)) == 0, true, "check content");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("read limited bytes");
ioBufferSizeSet(2);
TEST_ASSIGN(
buffer, storageGetP(storageNewReadP(storageTest, strNewFmt("%s/test.txt", testPath()), .limit = VARUINT64(7))), "get");
TEST_RESULT_INT(bufSize(buffer), 7, "check size");
TEST_RESULT_BOOL(memcmp(bufPtr(buffer), "TESTFIL", bufSize(buffer)) == 0, true, "check content");
}
// *****************************************************************************************************************************
@ -842,10 +852,11 @@ testRun(void)
TEST_CREATE_NOPERM();
StorageRead *file = NULL;
TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm, .ignoreMissing = true), "new read file");
TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm, .ignoreMissing = true, .limit = VARUINT64(44)), "new read file");
TEST_RESULT_PTR(storageRead(file), file->driver, " check driver");
TEST_RESULT_BOOL(storageReadIgnoreMissing(file), true, " check ignore missing");
TEST_RESULT_STR(storageReadName(file), fileNoPerm, " check name");
TEST_RESULT_UINT(varUInt64(storageReadLimit(file)), 44, " check limit");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm), "new no perm read file");

View File

@ -403,6 +403,10 @@ testRun(void)
TEST_RESULT_BOOL(bufEq(storageGetP(fileRead), contentBuf), true, " check contents");
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), " check read size");
TEST_ASSIGN(fileRead, storageNewReadP(storageRemote, strNew("test.txt"), .limit = VARUINT64(11)), "get file");
TEST_RESULT_STR_Z(strNewBuf(storageGetP(fileRead)), "BABABABABAB", " check contents");
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, 11, " check read size");
// Enable protocol compression in the storage object
((StorageRemote *)storageRemote->driver)->compressLevel = 3;
@ -422,6 +426,7 @@ testRun(void)
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNew("missing.txt")));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, NULL);
varLstAdd(paramList, varNewVarLst(varLstNew()));
TEST_RESULT_BOOL(
@ -433,12 +438,13 @@ testRun(void)
// Check protocol function directly (file exists)
// -------------------------------------------------------------------------------------------------------------------------
storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA"));
storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA!"));
ioBufferSizeSet(4);
paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath())));
varLstAdd(paramList, varNewBool(false));
varLstAdd(paramList, varNewUInt64(8));
// Create filters to test filter logic
IoFilterGroup *filterGroup = ioFilterGroupNew();
@ -469,9 +475,12 @@ testRun(void)
// Check protocol function directly (file exists but all data goes to sink)
// -------------------------------------------------------------------------------------------------------------------------
storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA"));
paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath())));
varLstAdd(paramList, varNewBool(false));
varLstAdd(paramList, NULL);
// Create filters to test filter logic
filterGroup = ioFilterGroupNew();
@ -496,6 +505,7 @@ testRun(void)
paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath())));
varLstAdd(paramList, varNewBool(false));
varLstAdd(paramList, NULL);
varLstAdd(paramList, varNewVarLst(varLstAdd(varLstNew(), varNewKv(kvAdd(kvNew(), varNewStrZ("bogus"), NULL)))));
TEST_ERROR(