You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-15 01:04:37 +02:00
Improve IoChunkedRead end-of-file handling.
Determine end-of-file earlier to improve throughput. Also clean up some comments and formatting.
This commit is contained in:
@ -19,6 +19,7 @@
|
|||||||
<release-feature-list>
|
<release-feature-list>
|
||||||
<release-item>
|
<release-item>
|
||||||
<commit subject="Add block incremental to real/all test output."/>
|
<commit subject="Add block incremental to real/all test output."/>
|
||||||
|
<commit subject="Improve IoChunkedRead end-of-file handling."/>
|
||||||
|
|
||||||
<release-item-contributor-list>
|
<release-item-contributor-list>
|
||||||
<release-item-contributor id="david.steele"/>
|
<release-item-contributor id="david.steele"/>
|
||||||
|
@ -23,13 +23,64 @@ typedef struct IoChunkedRead
|
|||||||
/***********************************************************************************************************************************
|
/***********************************************************************************************************************************
|
||||||
Macros for function logging
|
Macros for function logging
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
#define FUNCTION_LOG_IO_CHUNKED_READ_TYPE \
|
#define FUNCTION_LOG_IO_CHUNKED_READ_TYPE \
|
||||||
IoChunkedRead *
|
IoChunkedRead *
|
||||||
#define FUNCTION_LOG_IO_CHUNKED_READ_FORMAT(value, buffer, bufferSize) \
|
#define FUNCTION_LOG_IO_CHUNKED_READ_FORMAT(value, buffer, bufferSize) \
|
||||||
objNameToLog(value, "IoChunkedRead", buffer, bufferSize)
|
objNameToLog(value, "IoChunkedRead", buffer, bufferSize)
|
||||||
|
|
||||||
/***********************************************************************************************************************************
|
/***********************************************************************************************************************************
|
||||||
Read data from the buffer
|
Read next chunk size
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
static bool
|
||||||
|
ioChunkedReadNext(THIS_VOID)
|
||||||
|
{
|
||||||
|
THIS(IoChunkedRead);
|
||||||
|
|
||||||
|
FUNCTION_TEST_BEGIN();
|
||||||
|
FUNCTION_TEST_PARAM(IO_CHUNKED_READ, this);
|
||||||
|
FUNCTION_TEST_END();
|
||||||
|
|
||||||
|
const uint64_t chunkDelta = ioReadVarIntU64(this->read);
|
||||||
|
|
||||||
|
// Stop when chunk delta is zero, which indicates the end of the chunk list
|
||||||
|
if (chunkDelta == 0)
|
||||||
|
{
|
||||||
|
this->eof = true;
|
||||||
|
FUNCTION_TEST_RETURN(BOOL, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate next chunk size from delta
|
||||||
|
if (this->chunkLast == 0)
|
||||||
|
this->chunkRemains = (size_t)chunkDelta;
|
||||||
|
else
|
||||||
|
this->chunkRemains = (size_t)(cvtInt64FromZigZag(chunkDelta - 1) + (int64_t)this->chunkLast);
|
||||||
|
|
||||||
|
this->chunkLast = this->chunkRemains;
|
||||||
|
|
||||||
|
FUNCTION_TEST_RETURN(BOOL, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Read first chunk size
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
static bool
|
||||||
|
ioChunkedReadOpen(THIS_VOID)
|
||||||
|
{
|
||||||
|
THIS(IoChunkedRead);
|
||||||
|
|
||||||
|
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||||
|
FUNCTION_LOG_PARAM(IO_CHUNKED_READ, this);
|
||||||
|
FUNCTION_LOG_END();
|
||||||
|
|
||||||
|
ASSERT(this != NULL);
|
||||||
|
|
||||||
|
ioChunkedReadNext(this);
|
||||||
|
|
||||||
|
FUNCTION_LOG_RETURN(BOOL, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Read next chunk or partial chunk
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
static size_t
|
static size_t
|
||||||
ioChunkedRead(THIS_VOID, Buffer *const buffer, const bool block)
|
ioChunkedRead(THIS_VOID, Buffer *const buffer, const bool block)
|
||||||
@ -50,27 +101,6 @@ ioChunkedRead(THIS_VOID, Buffer *const buffer, const bool block)
|
|||||||
// Keep reading until the output buffer is full
|
// Keep reading until the output buffer is full
|
||||||
while (!bufFull(buffer))
|
while (!bufFull(buffer))
|
||||||
{
|
{
|
||||||
// If no data remaining in chunk then read the next chunk header
|
|
||||||
if (this->chunkRemains == 0)
|
|
||||||
{
|
|
||||||
const uint64_t chunkDelta = ioReadVarIntU64(this->read);
|
|
||||||
|
|
||||||
// Stop when chunk delta is zero, which indicates the end of the chunk list
|
|
||||||
if (chunkDelta == 0)
|
|
||||||
{
|
|
||||||
this->eof = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate next chunk size from delta
|
|
||||||
if (this->chunkLast == 0)
|
|
||||||
this->chunkRemains = (size_t)chunkDelta;
|
|
||||||
else
|
|
||||||
this->chunkRemains = (size_t)(cvtInt64FromZigZag(chunkDelta - 1) + (int64_t)this->chunkLast);
|
|
||||||
|
|
||||||
this->chunkLast = this->chunkRemains;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the entire chunk will fit in the output buffer
|
// If the entire chunk will fit in the output buffer
|
||||||
if (this->chunkRemains < bufRemains(buffer))
|
if (this->chunkRemains < bufRemains(buffer))
|
||||||
{
|
{
|
||||||
@ -79,6 +109,10 @@ ioChunkedRead(THIS_VOID, Buffer *const buffer, const bool block)
|
|||||||
|
|
||||||
actualBytes += this->chunkRemains;
|
actualBytes += this->chunkRemains;
|
||||||
this->chunkRemains = 0;
|
this->chunkRemains = 0;
|
||||||
|
|
||||||
|
// Read the next chunk header
|
||||||
|
if (!ioChunkedReadNext(this))
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
// Else only part of the chunk will fit in the output
|
// Else only part of the chunk will fit in the output
|
||||||
else
|
else
|
||||||
@ -94,7 +128,7 @@ ioChunkedRead(THIS_VOID, Buffer *const buffer, const bool block)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/***********************************************************************************************************************************
|
/***********************************************************************************************************************************
|
||||||
Have all bytes been read from the buffer?
|
Have all chunks been read?
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
static bool
|
static bool
|
||||||
ioChunkedReadEof(THIS_VOID)
|
ioChunkedReadEof(THIS_VOID)
|
||||||
@ -131,7 +165,7 @@ ioChunkedReadNew(IoRead *const read)
|
|||||||
.read = read,
|
.read = read,
|
||||||
};
|
};
|
||||||
|
|
||||||
this = ioReadNewP(driver, .eof = ioChunkedReadEof, .read = ioChunkedRead);
|
this = ioReadNewP(driver, .eof = ioChunkedReadEof, .open = ioChunkedReadOpen, .read = ioChunkedRead);
|
||||||
}
|
}
|
||||||
OBJ_NEW_END();
|
OBJ_NEW_END();
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user