You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-13 01:00:23 +02:00
Add limit parameter to ioCopyP().
Allows the number of bytes copied to be limited.
This commit is contained in:
@ -28,6 +28,19 @@
|
|||||||
<p>Increase precision of percent complete logging for <cmd>backup</cmd> and <cmd>restore</cmd>.</p>
|
<p>Increase precision of percent complete logging for <cmd>backup</cmd> and <cmd>restore</cmd>.</p>
|
||||||
</release-item>
|
</release-item>
|
||||||
</release-improvement-list>
|
</release-improvement-list>
|
||||||
|
|
||||||
|
<release-development-list>
|
||||||
|
<release-item>
|
||||||
|
<commit subject="Remove redundant restoreFile() test and improve coverage."/>
|
||||||
|
<commit subject="Add limit parameter to ioCopyP()."/>
|
||||||
|
|
||||||
|
<release-item-contributor-list>
|
||||||
|
<release-item-contributor id="david.steele"/>
|
||||||
|
</release-item-contributor-list>
|
||||||
|
|
||||||
|
<p>Improve small file support.</p>
|
||||||
|
</release-item>
|
||||||
|
</release-development-list>
|
||||||
</release-core-list>
|
</release-core-list>
|
||||||
|
|
||||||
<release-doc-list>
|
<release-doc-list>
|
||||||
|
@ -254,7 +254,7 @@ backupFile(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Copy data from source to destination
|
// Copy data from source to destination
|
||||||
ioCopy(storageReadIo(read), storageWriteIo(write));
|
ioCopyP(storageReadIo(read), storageWriteIo(write));
|
||||||
|
|
||||||
// Close the source
|
// Close the source
|
||||||
ioReadClose(storageReadIo(read));
|
ioReadClose(storageReadIo(read));
|
||||||
|
@ -157,7 +157,7 @@ storageGetProcess(IoWrite *destination)
|
|||||||
ioWriteOpen(destination);
|
ioWriteOpen(destination);
|
||||||
|
|
||||||
// Copy data from source to destination
|
// Copy data from source to destination
|
||||||
ioCopy(source, destination);
|
ioCopyP(source, destination);
|
||||||
|
|
||||||
// Close the source and destination
|
// Close the source and destination
|
||||||
ioReadClose(source);
|
ioReadClose(source);
|
||||||
|
@ -61,7 +61,7 @@ storagePutProcess(IoRead *source)
|
|||||||
ioWriteOpen(storageWriteIo(destination));
|
ioWriteOpen(storageWriteIo(destination));
|
||||||
|
|
||||||
// Copy data from source to destination
|
// Copy data from source to destination
|
||||||
ioCopy(source, storageWriteIo(destination));
|
ioCopyP(source, storageWriteIo(destination));
|
||||||
|
|
||||||
// Close the source and destination
|
// Close the source and destination
|
||||||
ioReadClose(source);
|
ioReadClose(source);
|
||||||
|
@ -97,24 +97,43 @@ ioReadBuf(IoRead *read)
|
|||||||
|
|
||||||
/**********************************************************************************************************************************/
|
/**********************************************************************************************************************************/
|
||||||
void
|
void
|
||||||
ioCopy(IoRead *const source, IoWrite *const destination)
|
ioCopy(IoRead *const source, IoWrite *const destination, const IoCopyParam param)
|
||||||
{
|
{
|
||||||
FUNCTION_TEST_BEGIN();
|
FUNCTION_TEST_BEGIN();
|
||||||
FUNCTION_TEST_PARAM(IO_READ, source);
|
FUNCTION_TEST_PARAM(IO_READ, source);
|
||||||
FUNCTION_TEST_PARAM(IO_WRITE, destination);
|
FUNCTION_TEST_PARAM(IO_WRITE, destination);
|
||||||
|
FUNCTION_TEST_PARAM(VARIANT, param.limit);
|
||||||
FUNCTION_TEST_END();
|
FUNCTION_TEST_END();
|
||||||
|
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
{
|
{
|
||||||
Buffer *const buffer = bufNew(ioBufferSize());
|
Buffer *const buffer = bufNew(ioBufferSize());
|
||||||
|
uint64_t copied = 0;
|
||||||
|
bool limitReached = false;
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
// If a limit was specified then limit the buffer size on the last iteration
|
||||||
|
if (param.limit != NULL)
|
||||||
|
{
|
||||||
|
const uint64_t bufferLimit = varUInt64(param.limit) - copied;
|
||||||
|
|
||||||
|
if (bufferLimit < bufSize(buffer))
|
||||||
|
{
|
||||||
|
bufLimitSet(buffer, (size_t)bufferLimit);
|
||||||
|
limitReached = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy bytes
|
||||||
ioRead(source, buffer);
|
ioRead(source, buffer);
|
||||||
ioWrite(destination, buffer);
|
ioWrite(destination, buffer);
|
||||||
|
|
||||||
|
// Update bytes copied and clear the buffer
|
||||||
|
copied += bufUsed(buffer);
|
||||||
bufUsedZero(buffer);
|
bufUsedZero(buffer);
|
||||||
}
|
}
|
||||||
while (!ioReadEof(source));
|
while (!limitReached && !ioReadEof(source));
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
|
||||||
|
@ -16,7 +16,16 @@ Common IO functions.
|
|||||||
Functions
|
Functions
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
// Copy data from source to destination (both must be open and neither will be closed)
|
// Copy data from source to destination (both must be open and neither will be closed)
|
||||||
void ioCopy(IoRead *source, IoWrite *destination);
|
typedef struct IoCopyParam
|
||||||
|
{
|
||||||
|
VAR_PARAM_HEADER;
|
||||||
|
const Variant *limit; // Limit bytes to copy from source
|
||||||
|
} IoCopyParam;
|
||||||
|
|
||||||
|
#define ioCopyP(source, destination, ...) \
|
||||||
|
ioCopy(source, destination, (IoCopyParam){VAR_PARAM_INIT, __VA_ARGS__})
|
||||||
|
|
||||||
|
void ioCopy(IoRead *source, IoWrite *destination, IoCopyParam param);
|
||||||
|
|
||||||
// Read all IO into a buffer
|
// Read all IO into a buffer
|
||||||
Buffer *ioReadBuf(IoRead *read);
|
Buffer *ioReadBuf(IoRead *read);
|
||||||
|
@ -88,7 +88,7 @@ lockReadDataFile(const String *const lockFile, const int fd)
|
|||||||
Buffer *const buffer = bufNew(LOCK_BUFFER_SIZE);
|
Buffer *const buffer = bufNew(LOCK_BUFFER_SIZE);
|
||||||
IoWrite *const write = ioBufferWriteNewOpen(buffer);
|
IoWrite *const write = ioBufferWriteNewOpen(buffer);
|
||||||
|
|
||||||
ioCopy(ioFdReadNewOpen(lockFile, fd, 0), write);
|
ioCopyP(ioFdReadNewOpen(lockFile, fd, 0), write);
|
||||||
ioWriteClose(write);
|
ioWriteClose(write);
|
||||||
|
|
||||||
// Parse the file
|
// Parse the file
|
||||||
@ -127,7 +127,7 @@ lockWriteData(const LockType lockType)
|
|||||||
{
|
{
|
||||||
IoWrite *const write = ioFdWriteNewOpen(lockLocal.file[lockType].name, lockLocal.file[lockType].fd, 0);
|
IoWrite *const write = ioFdWriteNewOpen(lockLocal.file[lockType].name, lockLocal.file[lockType].fd, 0);
|
||||||
|
|
||||||
ioCopy(ioBufferReadNewOpen(BUFSTR(strNewFmt("%d" LF_Z "%s" LF_Z, getpid(), strZ(lockLocal.execId)))), write);
|
ioCopyP(ioBufferReadNewOpen(BUFSTR(strNewFmt("%d" LF_Z "%s" LF_Z, getpid(), strZ(lockLocal.execId)))), write);
|
||||||
ioWriteClose(write);
|
ioWriteClose(write);
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
@ -115,7 +115,7 @@ storageCopy(StorageRead *source, StorageWrite *destination)
|
|||||||
ioWriteOpen(storageWriteIo(destination));
|
ioWriteOpen(storageWriteIo(destination));
|
||||||
|
|
||||||
// Copy data from source to destination
|
// Copy data from source to destination
|
||||||
ioCopy(storageReadIo(source), storageWriteIo(destination));
|
ioCopyP(storageReadIo(source), storageWriteIo(destination));
|
||||||
|
|
||||||
// Close the source and destination files
|
// Close the source and destination files
|
||||||
ioReadClose(storageReadIo(source));
|
ioReadClose(storageReadIo(source));
|
||||||
|
@ -466,7 +466,7 @@ testRun(void)
|
|||||||
TEST_RESULT_STR_Z(ioReadLineParam(read, true), "1234", "read line without eof");
|
TEST_RESULT_STR_Z(ioReadLineParam(read, true), "1234", "read line without eof");
|
||||||
|
|
||||||
// -------------------------------------------------------------------------------------------------------------------------
|
// -------------------------------------------------------------------------------------------------------------------------
|
||||||
TEST_TITLE("ioCopy()");
|
TEST_TITLE("ioCopyP()");
|
||||||
|
|
||||||
ioBufferSizeSet(4);
|
ioBufferSizeSet(4);
|
||||||
|
|
||||||
@ -475,11 +475,26 @@ testRun(void)
|
|||||||
buffer = bufNew(0);
|
buffer = bufNew(0);
|
||||||
IoWrite *bufferWrite = ioBufferWriteNewOpen(buffer);
|
IoWrite *bufferWrite = ioBufferWriteNewOpen(buffer);
|
||||||
|
|
||||||
TEST_RESULT_VOID(ioCopy(bufferRead, bufferWrite), "copy buffer");
|
TEST_RESULT_VOID(ioCopyP(bufferRead, bufferWrite), "copy buffer");
|
||||||
TEST_RESULT_VOID(ioWriteClose(bufferWrite), "close write");
|
TEST_RESULT_VOID(ioWriteClose(bufferWrite), "close write");
|
||||||
|
|
||||||
TEST_RESULT_STR_Z(strNewBuf(buffer), "a test string", "check buffer");
|
TEST_RESULT_STR_Z(strNewBuf(buffer), "a test string", "check buffer");
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------------------------------------------------------
|
||||||
|
TEST_TITLE("ioCopyP() with limit");
|
||||||
|
|
||||||
|
ioBufferSizeSet(4);
|
||||||
|
|
||||||
|
bufferRead = ioBufferReadNewOpen(BUFSTRDEF("a test string"));
|
||||||
|
|
||||||
|
buffer = bufNew(0);
|
||||||
|
bufferWrite = ioBufferWriteNewOpen(buffer);
|
||||||
|
|
||||||
|
TEST_RESULT_VOID(ioCopyP(bufferRead, bufferWrite, .limit = VARUINT64(6)), "copy buffer");
|
||||||
|
TEST_RESULT_VOID(ioWriteClose(bufferWrite), "close write");
|
||||||
|
|
||||||
|
TEST_RESULT_STR_Z(strNewBuf(buffer), "a test", "check buffer");
|
||||||
|
|
||||||
// Read IO into a buffer
|
// Read IO into a buffer
|
||||||
// -------------------------------------------------------------------------------------------------------------------------
|
// -------------------------------------------------------------------------------------------------------------------------
|
||||||
ioBufferSizeSet(8);
|
ioBufferSizeSet(8);
|
||||||
|
@ -271,7 +271,7 @@ testRun(void)
|
|||||||
\
|
\
|
||||||
uint64_t benchMarkBegin = timeMSec(); \
|
uint64_t benchMarkBegin = timeMSec(); \
|
||||||
\
|
\
|
||||||
ioCopy(read, write); \
|
ioCopyP(read, write); \
|
||||||
\
|
\
|
||||||
ioReadClose(read); \
|
ioReadClose(read); \
|
||||||
ioWriteClose(write); \
|
ioWriteClose(write); \
|
||||||
|
Reference in New Issue
Block a user