You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-09 00:45:49 +02:00
Only process next filter in IoFilterGroup when input buffer is full or flushing.
This greatly reduces calls to filter processing, which is a performance benefit, but also makes the trace logs smaller and easier to read. However, this means that ioWriteFlush() will no longer work with filters since a full flush of IoFilterGroup would require an expensive reset. Currently ioWriteFlush() is not used in this scenario so for now just add an assert to ensure it stays that way.
This commit is contained in:
@ -21,6 +21,10 @@
|
||||
</release-improvement-list>
|
||||
|
||||
<release-development-list>
|
||||
<release-item>
|
||||
<p>Only process next filter in <code>IoFilterGroup</code> when input buffer is full or flushing.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<p>Add macros to create constant <code>Buffer</code> objects.</p>
|
||||
</release-item>
|
||||
|
@ -262,21 +262,16 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
// Checking filterIdx - 1 is safe because the first filter's filterData->input is always set to NULL when input
|
||||
// is NULL.
|
||||
if (input == NULL && filterData->input != NULL && !ioFilterDone(filterData->filter) &&
|
||||
ioFilterDone(ioFilterGroupGet(this, filterIdx - 1)->filter) && bufUsed(filterData->input) == 0)
|
||||
bufUsed(filterData->input) == 0)
|
||||
{
|
||||
filterData->input = NULL;
|
||||
}
|
||||
|
||||
ioFilterProcessInOut(filterData->filter, filterData->input, filterData->output);
|
||||
|
||||
// If there was no output then break out of filter processing because more input is needed
|
||||
if (bufUsed(filterData->output) == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
// Else if inputSame is set then the output buffer for this filter is full and it will need to be re-processed
|
||||
// with the same input once the output buffer is cleared
|
||||
else if (ioFilterInputSame(filterData->filter))
|
||||
// If inputSame is set then the output buffer for this filter is full and it will need to be re-processed with
|
||||
// the same input once the output buffer is cleared
|
||||
if (ioFilterInputSame(filterData->filter))
|
||||
{
|
||||
this->inputSame = true;
|
||||
}
|
||||
@ -284,6 +279,12 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
// responsible for clearing it.
|
||||
else if (filterData->inputLocal != NULL)
|
||||
bufUsedZero(filterData->inputLocal);
|
||||
|
||||
// If the output buffer is not full and the filter is not done then more data is required
|
||||
if (!bufFull(filterData->output) && !ioFilterDone(filterData->filter))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Else the filter does not produce output. No need to flush these filters because they don't buffer data.
|
||||
|
@ -21,6 +21,7 @@ struct IoWrite
|
||||
Buffer *output; // Output buffer
|
||||
|
||||
#ifdef DEBUG
|
||||
bool filterGroupSet; // Was an IoFilterGroup set?
|
||||
bool opened; // Has the io been opened?
|
||||
bool closed; // Has the io been closed?
|
||||
#endif
|
||||
@ -153,6 +154,7 @@ ioWriteFlush(IoWrite *this)
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->opened && !this->closed);
|
||||
ASSERT(!this->filterGroupSet);
|
||||
|
||||
if (bufUsed(this->output) > 0)
|
||||
{
|
||||
@ -235,6 +237,11 @@ ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup)
|
||||
ASSERT(this->filterGroup == NULL);
|
||||
ASSERT(!this->opened && !this->closed);
|
||||
|
||||
// Track whether a filter group was set to prevent flush() from being called later
|
||||
#ifdef DEBUG
|
||||
this->filterGroupSet = true;
|
||||
#endif
|
||||
|
||||
this->filterGroup = filterGroup;
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
|
@ -460,13 +460,8 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), BUFSTRDEF("Z")), " write string");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\n", " no change because output buffer is not full");
|
||||
TEST_RESULT_VOID(ioWriteFlush(ioBufferWriteIo(bufferWrite)), " flush output");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ", " check output is flushed");
|
||||
TEST_RESULT_VOID(ioWriteFlush(ioBufferWriteIo(bufferWrite)), " flush again (nothing to flush)");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ", " check output is unchanged");
|
||||
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), BUFSTRDEF("12345")), " write bytes");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ112233445", " check write");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ1122334455", " check write");
|
||||
|
||||
TEST_RESULT_VOID(ioWriteClose(ioBufferWriteIo(bufferWrite)), " close buffer write object");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\nZZ1122334455XXXY", " check write after close");
|
||||
@ -506,6 +501,7 @@ testRun(void)
|
||||
// Write a line to be read
|
||||
TEST_RESULT_VOID(ioWriteLine(ioHandleWriteIo(write), strNew("test string 1")), "write test string");
|
||||
ioWriteFlush(ioHandleWriteIo(write));
|
||||
ioWriteFlush(ioHandleWriteIo(write));
|
||||
|
||||
// Sleep so the other side will timeout
|
||||
const Buffer *buffer = BUFSTRDEF("12345678");
|
||||
|
Reference in New Issue
Block a user