From f2548f45ce4c1e444e9d3175b2349e0a97b2783a Mon Sep 17 00:00:00 2001 From: David Steele Date: Tue, 17 Mar 2020 18:16:17 -0400 Subject: [PATCH] Allow storage reads to be limited by bytes. The current use case is reading files from the PostgreSQL cluster during backup. A file may grow during backup but we only need to copy the number of bytes that were reported during the manifest build. The rest will be rebuilt from the WAL during recovery so copying more is just a waste of space. Limiting the copy sizes in backup will be part of a future commit. --- src/storage/posix/read.c | 23 ++++++++++++++++++----- src/storage/posix/read.h | 2 +- src/storage/posix/storage.c | 6 +++--- src/storage/read.c | 13 +++++++++++++ src/storage/read.h | 4 ++++ src/storage/read.intern.h | 1 + src/storage/remote/protocol.c | 5 +++-- src/storage/remote/read.c | 5 ++++- src/storage/remote/read.h | 2 +- src/storage/remote/storage.c | 4 +++- src/storage/storage.c | 6 +++++- src/storage/storage.h | 4 ++++ src/storage/storage.intern.h | 3 +++ test/src/module/storage/posixTest.c | 13 ++++++++++++- test/src/module/storage/remoteTest.c | 12 +++++++++++- 15 files changed, 86 insertions(+), 17 deletions(-) diff --git a/src/storage/posix/read.c b/src/storage/posix/read.c index 6b517a580..fe16e5212 100644 --- a/src/storage/posix/read.c +++ b/src/storage/posix/read.c @@ -28,6 +28,8 @@ typedef struct StorageReadPosix StoragePosix *storage; // Storage that created this object int handle; + uint64_t current; // Current bytes read from file + uint64_t limit; // Limit bytes to be read from file bool eof; } StorageReadPosix; @@ -112,8 +114,13 @@ storageReadPosix(THIS_VOID, Buffer *buffer, bool block) if (!this->eof) { - // Read and handle errors + // Determine expected bytes to read. If remaining size in the buffer would exceed the limit then reduce the expected read. size_t expectedBytes = bufRemains(buffer); + + if (this->current + expectedBytes > this->limit) + expectedBytes = (size_t)(this->limit - this->current); + + // Read from file actualBytes = read(this->handle, bufRemainsPtr(buffer), expectedBytes); // Error occurred during read @@ -122,10 +129,11 @@ storageReadPosix(THIS_VOID, Buffer *buffer, bool block) // Update amount of buffer used bufUsedInc(buffer, (size_t)actualBytes); + this->current += (uint64_t)actualBytes; - // If less data than expected was read then EOF. The file may not actually be EOF but we are not concerned with files that - // are growing. Just read up to the point where the file is being extended. - if ((size_t)actualBytes != expectedBytes) + // If less data than expected was read or the limit has been reached then EOF. The file may not actually be EOF but we are + // not concerned with files that are growing. Just read up to the point where the file is being extended. + if ((size_t)actualBytes != expectedBytes || this->current == this->limit) this->eof = true; } @@ -191,11 +199,12 @@ storageReadPosixHandle(const THIS_VOID) New object ***********************************************************************************************************************************/ StorageRead * -storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing) +storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit) { FUNCTION_LOG_BEGIN(logLevelTrace); FUNCTION_LOG_PARAM(STRING, name); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); + FUNCTION_LOG_PARAM(VARIANT, limit); FUNCTION_LOG_END(); ASSERT(name != NULL); @@ -212,11 +221,15 @@ storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissin .storage = storage, .handle = -1, + // Rather than enable/disable limit checking just always use a limit + .limit = limit == NULL ? UINT64_MAX : varUInt64(limit), + .interface = (StorageReadInterface) { .type = STORAGE_POSIX_TYPE_STR, .name = strDup(name), .ignoreMissing = ignoreMissing, + .limit = varDup(limit), .ioInterface = (IoReadInterface) { diff --git a/src/storage/posix/read.h b/src/storage/posix/read.h index 82ede8441..ac0e3a857 100644 --- a/src/storage/posix/read.h +++ b/src/storage/posix/read.h @@ -10,6 +10,6 @@ Posix Storage Read /*********************************************************************************************************************************** Constructor ***********************************************************************************************************************************/ -StorageRead *storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing); +StorageRead *storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit); #endif diff --git a/src/storage/posix/storage.c b/src/storage/posix/storage.c index 33b7177d7..3fbaf866c 100644 --- a/src/storage/posix/storage.c +++ b/src/storage/posix/storage.c @@ -389,13 +389,13 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageIn FUNCTION_LOG_PARAM(STORAGE_POSIX, this); FUNCTION_LOG_PARAM(STRING, file); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); - (void)param; // No parameters are used + FUNCTION_LOG_PARAM(VARIANT, param.limit); FUNCTION_LOG_END(); ASSERT(this != NULL); ASSERT(file != NULL); - FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing)); + FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing, param.limit)); } /**********************************************************************************************************************************/ @@ -635,7 +635,7 @@ New object ***********************************************************************************************************************************/ static const StorageInterface storageInterfacePosix = { - .feature = (1 << storageFeaturePath | 1 << storageFeatureCompress), + .feature = 1 << storageFeaturePath | 1 << storageFeatureCompress | 1 << storageFeatureLimitRead, .exists = storagePosixExists, .info = storagePosixInfo, diff --git a/src/storage/read.c b/src/storage/read.c index 49d903d61..8bd051c25 100644 --- a/src/storage/read.c +++ b/src/storage/read.c @@ -90,6 +90,19 @@ storageReadIgnoreMissing(const StorageRead *this) FUNCTION_TEST_RETURN(this->interface->ignoreMissing); } +/**********************************************************************************************************************************/ +const Variant * +storageReadLimit(const StorageRead *this) +{ + FUNCTION_TEST_BEGIN(); + FUNCTION_TEST_PARAM(STORAGE_READ, this); + FUNCTION_TEST_END(); + + ASSERT(this != NULL); + + FUNCTION_TEST_RETURN(this->interface->limit); +} + /*********************************************************************************************************************************** Get io interface ***********************************************************************************************************************************/ diff --git a/src/storage/read.h b/src/storage/read.h index 5b99667eb..15316c875 100644 --- a/src/storage/read.h +++ b/src/storage/read.h @@ -23,6 +23,10 @@ StorageRead *storageReadMove(StorageRead *this, MemContext *parentNew); Getters ***********************************************************************************************************************************/ IoRead *storageReadIo(const StorageRead *this); + +// Is there a read limit? +const Variant *storageReadLimit(const StorageRead *this); + bool storageReadIgnoreMissing(const StorageRead *this); const String *storageReadName(const StorageRead *this); const String *storageReadType(const StorageRead *this); diff --git a/src/storage/read.intern.h b/src/storage/read.intern.h index d62daf3af..14bc99d52 100644 --- a/src/storage/read.intern.h +++ b/src/storage/read.intern.h @@ -17,6 +17,7 @@ typedef struct StorageReadInterface bool compressible; // Is this file compressible? unsigned int compressLevel; // Level to use for compression bool ignoreMissing; + const Variant *limit; // Limit how many bytes are read IoReadInterface ioInterface; } StorageReadInterface; diff --git a/src/storage/remote/protocol.c b/src/storage/remote/protocol.c index 2641cb8d8..e8ca4def4 100644 --- a/src/storage/remote/protocol.c +++ b/src/storage/remote/protocol.c @@ -246,10 +246,11 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto { // Create the read object IoRead *fileRead = storageReadIo( - storageInterfaceNewReadP(driver, varStr(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1)))); + storageInterfaceNewReadP( + driver, varStr(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1)), .limit = varLstGet(paramList, 2))); // Set filter group based on passed filters - storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 2)); + storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 3)); // Check if the file exists bool exists = ioReadOpen(fileRead); diff --git a/src/storage/remote/read.c b/src/storage/remote/read.c index 95e16785a..dfb941705 100644 --- a/src/storage/remote/read.c +++ b/src/storage/remote/read.c @@ -72,6 +72,7 @@ storageReadRemoteOpen(THIS_VOID) ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR); protocolCommandParamAdd(command, VARSTR(this->interface.name)); protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing)); + protocolCommandParamAdd(command, this->interface.limit); protocolCommandParamAdd(command, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read)))); result = varBool(protocolClientExecute(this->client, command, true)); @@ -178,7 +179,7 @@ New object StorageRead * storageReadRemoteNew( StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible, - unsigned int compressLevel) + unsigned int compressLevel, const Variant *limit) { FUNCTION_LOG_BEGIN(logLevelTrace); FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage); @@ -187,6 +188,7 @@ storageReadRemoteNew( FUNCTION_LOG_PARAM(BOOL, ignoreMissing); FUNCTION_LOG_PARAM(BOOL, compressible); FUNCTION_LOG_PARAM(UINT, compressLevel); + FUNCTION_LOG_PARAM(VARIANT, limit); FUNCTION_LOG_END(); ASSERT(storage != NULL); @@ -212,6 +214,7 @@ storageReadRemoteNew( .compressible = compressible, .compressLevel = compressLevel, .ignoreMissing = ignoreMissing, + .limit = varDup(limit), .ioInterface = (IoReadInterface) { diff --git a/src/storage/remote/read.h b/src/storage/remote/read.h index 277f1f61e..c0856e7ba 100644 --- a/src/storage/remote/read.h +++ b/src/storage/remote/read.h @@ -13,6 +13,6 @@ Constructor ***********************************************************************************************************************************/ StorageRead *storageReadRemoteNew( StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible, - unsigned int compressLevel); + unsigned int compressLevel, const Variant *limit); #endif diff --git a/src/storage/remote/storage.c b/src/storage/remote/storage.c index e05f55d91..3ac8165af 100644 --- a/src/storage/remote/storage.c +++ b/src/storage/remote/storage.c @@ -251,6 +251,7 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI FUNCTION_LOG_PARAM(STRING, file); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); FUNCTION_LOG_PARAM(BOOL, param.compressible); + FUNCTION_LOG_PARAM(VARIANT, param.limit); FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -259,7 +260,8 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI FUNCTION_LOG_RETURN( STORAGE_READ, storageReadRemoteNew( - this, this->client, file, ignoreMissing, this->compressLevel > 0 ? param.compressible : false, this->compressLevel)); + this, this->client, file, ignoreMissing, this->compressLevel > 0 ? param.compressible : false, this->compressLevel, + param.limit)); } /**********************************************************************************************************************************/ diff --git a/src/storage/storage.c b/src/storage/storage.c index 0af9e00c7..e108a9f5d 100644 --- a/src/storage/storage.c +++ b/src/storage/storage.c @@ -597,9 +597,12 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p FUNCTION_LOG_PARAM(STRING, fileExp); FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing); FUNCTION_LOG_PARAM(BOOL, param.compressible); + FUNCTION_LOG_PARAM(VARIANT, param.limit); FUNCTION_LOG_END(); ASSERT(this != NULL); + ASSERT(storageFeature(this, storageFeatureLimitRead) || param.limit == NULL); + ASSERT(param.limit == NULL || varType(param.limit) == varTypeUInt64); StorageRead *result = NULL; @@ -607,7 +610,8 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p { result = storageReadMove( storageInterfaceNewReadP( - this->driver, storagePathP(this, fileExp), param.ignoreMissing, .compressible = param.compressible), + this->driver, storagePathP(this, fileExp), param.ignoreMissing, .compressible = param.compressible, + .limit = param.limit), memContextPrior()); } MEM_CONTEXT_TEMP_END(); diff --git a/src/storage/storage.h b/src/storage/storage.h index da24f0dfd..d8e495140 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -45,6 +45,9 @@ typedef enum // Does the storage support hardlinks? Hardlinks allow the same file to be linked into multiple paths to save space. storageFeatureHardLink, + // Can the storage limit the amount of data read from a file? + storageFeatureLimitRead, + // Does the storage support symlinks? Symlinks allow paths/files/links to be accessed from another path. storageFeatureSymLink, } StorageFeature; @@ -153,6 +156,7 @@ typedef struct StorageNewReadParam VAR_PARAM_HEADER; bool ignoreMissing; bool compressible; + const Variant *limit; // Limit bytes to read from the file (must be varTypeUInt64) } StorageNewReadParam; #define storageNewReadP(this, pathExp, ...) \ diff --git a/src/storage/storage.intern.h b/src/storage/storage.intern.h index 7d805a75a..ea1e94328 100644 --- a/src/storage/storage.intern.h +++ b/src/storage/storage.intern.h @@ -104,6 +104,9 @@ typedef struct StorageInterfaceNewReadParam // Is the file compressible? This is useful when the file must be moved across a network and some temporary compression is // helpful. bool compressible; + + // Limit bytes read from the file + const Variant *limit; } StorageInterfaceNewReadParam; typedef StorageRead *StorageInterfaceNewRead( diff --git a/test/src/module/storage/posixTest.c b/test/src/module/storage/posixTest.c index 8bad3e38c..b5953d7e3 100644 --- a/test/src/module/storage/posixTest.c +++ b/test/src/module/storage/posixTest.c @@ -811,6 +811,16 @@ testRun(void) TEST_ASSIGN(buffer, storageGetP(storageNewReadP(storageTest, strNewFmt("%s/test.txt", testPath()))), "get text"); TEST_RESULT_INT(bufSize(buffer), 9, "check size"); TEST_RESULT_BOOL(memcmp(bufPtr(buffer), "TESTFILE\n", bufSize(buffer)) == 0, true, "check content"); + + // ------------------------------------------------------------------------------------------------------------------------- + TEST_TITLE("read limited bytes"); + + ioBufferSizeSet(2); + + TEST_ASSIGN( + buffer, storageGetP(storageNewReadP(storageTest, strNewFmt("%s/test.txt", testPath()), .limit = VARUINT64(7))), "get"); + TEST_RESULT_INT(bufSize(buffer), 7, "check size"); + TEST_RESULT_BOOL(memcmp(bufPtr(buffer), "TESTFIL", bufSize(buffer)) == 0, true, "check content"); } // ***************************************************************************************************************************** @@ -842,10 +852,11 @@ testRun(void) TEST_CREATE_NOPERM(); StorageRead *file = NULL; - TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm, .ignoreMissing = true), "new read file"); + TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm, .ignoreMissing = true, .limit = VARUINT64(44)), "new read file"); TEST_RESULT_PTR(storageRead(file), file->driver, " check driver"); TEST_RESULT_BOOL(storageReadIgnoreMissing(file), true, " check ignore missing"); TEST_RESULT_STR(storageReadName(file), fileNoPerm, " check name"); + TEST_RESULT_UINT(varUInt64(storageReadLimit(file)), 44, " check limit"); // ------------------------------------------------------------------------------------------------------------------------- TEST_ASSIGN(file, storageNewReadP(storageTest, fileNoPerm), "new no perm read file"); diff --git a/test/src/module/storage/remoteTest.c b/test/src/module/storage/remoteTest.c index e7c866ebf..2b9c39922 100644 --- a/test/src/module/storage/remoteTest.c +++ b/test/src/module/storage/remoteTest.c @@ -403,6 +403,10 @@ testRun(void) TEST_RESULT_BOOL(bufEq(storageGetP(fileRead), contentBuf), true, " check contents"); TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), " check read size"); + TEST_ASSIGN(fileRead, storageNewReadP(storageRemote, strNew("test.txt"), .limit = VARUINT64(11)), "get file"); + TEST_RESULT_STR_Z(strNewBuf(storageGetP(fileRead)), "BABABABABAB", " check contents"); + TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, 11, " check read size"); + // Enable protocol compression in the storage object ((StorageRemote *)storageRemote->driver)->compressLevel = 3; @@ -422,6 +426,7 @@ testRun(void) VariantList *paramList = varLstNew(); varLstAdd(paramList, varNewStr(strNew("missing.txt"))); varLstAdd(paramList, varNewBool(true)); + varLstAdd(paramList, NULL); varLstAdd(paramList, varNewVarLst(varLstNew())); TEST_RESULT_BOOL( @@ -433,12 +438,13 @@ testRun(void) // Check protocol function directly (file exists) // ------------------------------------------------------------------------------------------------------------------------- - storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA")); + storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA!")); ioBufferSizeSet(4); paramList = varLstNew(); varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath()))); varLstAdd(paramList, varNewBool(false)); + varLstAdd(paramList, varNewUInt64(8)); // Create filters to test filter logic IoFilterGroup *filterGroup = ioFilterGroupNew(); @@ -469,9 +475,12 @@ testRun(void) // Check protocol function directly (file exists but all data goes to sink) // ------------------------------------------------------------------------------------------------------------------------- + storagePutP(storageNewWriteP(storageTest, strNew("repo/test.txt")), BUFSTRDEF("TESTDATA")); + paramList = varLstNew(); varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath()))); varLstAdd(paramList, varNewBool(false)); + varLstAdd(paramList, NULL); // Create filters to test filter logic filterGroup = ioFilterGroupNew(); @@ -496,6 +505,7 @@ testRun(void) paramList = varLstNew(); varLstAdd(paramList, varNewStr(strNewFmt("%s/repo/test.txt", testPath()))); varLstAdd(paramList, varNewBool(false)); + varLstAdd(paramList, NULL); varLstAdd(paramList, varNewVarLst(varLstAdd(varLstNew(), varNewKv(kvAdd(kvNew(), varNewStrZ("bogus"), NULL))))); TEST_ERROR(