mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2024-12-12 10:04:14 +02:00
Add ioReadLine()/ioWriteLine() to IoRead/IoWrite objects.
Allow a single linefeed-terminated line to be read or written. This is useful for various protocol implementations, including HTTP and pgBackRest's protocol. On read the maximum line size is limited to buffer-size to prevent runaway memory usage in case a linefeed is not found. This seems fine for HTTP but we may need to revisit this decision when implementing the pgBackRest protocol. Another option would be to increase the minimum buffer size (currently 16KB).
This commit is contained in:
parent
db8dce7adc
commit
68110d04b2
@ -94,6 +94,10 @@
|
||||
<p>Info objects now parse JSON and use specified storage.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<p>Add <code>ioReadLine()</code>/<code>ioWriteLine()</code> to <code>IoRead</code>/<code>IoWrite</code> objects.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<p>Add helper for repository storage.</p>
|
||||
</release-item>
|
||||
|
@ -1,6 +1,8 @@
|
||||
/***********************************************************************************************************************************
|
||||
IO Read Interface
|
||||
***********************************************************************************************************************************/
|
||||
#include <string.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/io/read.intern.h"
|
||||
@ -17,6 +19,7 @@ struct IoRead
|
||||
IoReadInterface interface; // Driver interface
|
||||
IoFilterGroup *filterGroup; // IO filters
|
||||
Buffer *input; // Input buffer
|
||||
Buffer *output; // Output buffer (holds extra data from line read)
|
||||
|
||||
bool eofAll; // Is the read done (read and filters complete)?
|
||||
|
||||
@ -105,11 +108,12 @@ ioReadEofDriver(const IoRead *this)
|
||||
/***********************************************************************************************************************************
|
||||
Read data from IO and process filters
|
||||
***********************************************************************************************************************************/
|
||||
size_t
|
||||
ioRead(IoRead *this, Buffer *buffer)
|
||||
static void
|
||||
ioReadInternal(IoRead *this, Buffer *buffer)
|
||||
{
|
||||
FUNCTION_DEBUG_BEGIN(logLevelTrace);
|
||||
FUNCTION_DEBUG_PARAM(IO_READ, this);
|
||||
FUNCTION_DEBUG_PARAM(BUFFER, buffer);
|
||||
|
||||
FUNCTION_TEST_ASSERT(this != NULL);
|
||||
FUNCTION_TEST_ASSERT(buffer != NULL);
|
||||
@ -117,8 +121,6 @@ ioRead(IoRead *this, Buffer *buffer)
|
||||
FUNCTION_DEBUG_END();
|
||||
|
||||
// Loop until EOF or the output buffer is full
|
||||
size_t outputRemains = bufRemains(buffer);
|
||||
|
||||
while (!this->eofAll && bufRemains(buffer) > 0)
|
||||
{
|
||||
// Process input buffer again to get more output
|
||||
@ -147,9 +149,103 @@ ioRead(IoRead *this, Buffer *buffer)
|
||||
this->eofAll = ioReadEofDriver(this) && ioFilterGroupDone(this->filterGroup);
|
||||
}
|
||||
|
||||
FUNCTION_DEBUG_RESULT_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Read data and use buffered line read output when present
|
||||
***********************************************************************************************************************************/
|
||||
size_t
|
||||
ioRead(IoRead *this, Buffer *buffer)
|
||||
{
|
||||
FUNCTION_DEBUG_BEGIN(logLevelTrace);
|
||||
FUNCTION_DEBUG_PARAM(IO_READ, this);
|
||||
FUNCTION_DEBUG_PARAM(BUFFER, buffer);
|
||||
FUNCTION_DEBUG_PARAM(BUFFER, this->output);
|
||||
|
||||
FUNCTION_TEST_ASSERT(this != NULL);
|
||||
FUNCTION_TEST_ASSERT(buffer != NULL);
|
||||
FUNCTION_TEST_ASSERT(this->opened && !this->closed);
|
||||
FUNCTION_DEBUG_END();
|
||||
|
||||
// Store size of remaining portion of buffer to calculate total read at the end
|
||||
size_t outputRemains = bufRemains(buffer);
|
||||
|
||||
// Use any data in the output buffer left over from a line read
|
||||
if (this->output != NULL && bufUsed(this->output) > 0 && bufRemains(buffer) > 0)
|
||||
{
|
||||
// Determine how much data should be copied
|
||||
size_t size = bufUsed(this->output) > bufRemains(buffer) ? bufRemains(buffer) : bufUsed(this->output);
|
||||
|
||||
// Copy data to the user buffer
|
||||
bufCatSub(buffer, this->output, 0, size);
|
||||
|
||||
// Remove copied data from the output buffer
|
||||
memmove(bufPtr(this->output), bufPtr(this->output) + size, bufUsed(this->output) - size);
|
||||
bufUsedSet(this->output, bufUsed(this->output) - size);
|
||||
}
|
||||
|
||||
// Read data
|
||||
ioReadInternal(this, buffer);
|
||||
|
||||
FUNCTION_DEBUG_RESULT(SIZE, outputRemains - bufRemains(buffer));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Read linefeed-terminated string
|
||||
***********************************************************************************************************************************/
|
||||
String *
|
||||
ioReadLine(IoRead *this)
|
||||
{
|
||||
FUNCTION_DEBUG_BEGIN(logLevelTrace);
|
||||
FUNCTION_DEBUG_PARAM(IO_READ, this);
|
||||
FUNCTION_DEBUG_PARAM(BUFFER, this->output);
|
||||
|
||||
FUNCTION_TEST_ASSERT(this != NULL);
|
||||
FUNCTION_TEST_ASSERT(this->opened && !this->closed);
|
||||
FUNCTION_DEBUG_END();
|
||||
|
||||
// Allocate the output buffer if it has not already been allocated. This buffer is not allocated at object creation because it
|
||||
// is not always used.
|
||||
if (this->output == NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
{
|
||||
this->output = bufNew(ioBufferSize());
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
|
||||
// Read more data if there is any. The entire string we are searching for must fit within the buffer so we'll make sure that
|
||||
// the buffer is full
|
||||
ioReadInternal(this, this->output);
|
||||
|
||||
// If some data was read search for a linefeed
|
||||
String *result = NULL;
|
||||
|
||||
if (bufUsed(this->output) > 0)
|
||||
{
|
||||
// Search for a linefeed in the buffer
|
||||
char *linefeed = memchr(bufPtr(this->output), '\n', bufUsed(this->output));
|
||||
|
||||
// A linefeed was found so get the string
|
||||
if (linefeed != NULL)
|
||||
{
|
||||
// Get the string size
|
||||
size_t size = (size_t)(linefeed - (char *)bufPtr(this->output) + 1);
|
||||
|
||||
// Create the string
|
||||
result = strNewN((char *)bufPtr(this->output), size - 1);
|
||||
|
||||
// Remove string from the output buffer
|
||||
memmove(bufPtr(this->output), bufPtr(this->output) + size, bufUsed(this->output) - size);
|
||||
bufUsedSet(this->output, bufUsed(this->output) - size);
|
||||
}
|
||||
}
|
||||
|
||||
FUNCTION_DEBUG_RESULT(STRING, result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the IO
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -22,6 +22,7 @@ Functions
|
||||
***********************************************************************************************************************************/
|
||||
bool ioReadOpen(IoRead *this);
|
||||
size_t ioRead(IoRead *this, Buffer *buffer);
|
||||
String *ioReadLine(IoRead *this);
|
||||
void ioReadClose(IoRead *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
|
@ -1,6 +1,8 @@
|
||||
/***********************************************************************************************************************************
|
||||
IO Write Interface
|
||||
***********************************************************************************************************************************/
|
||||
#include <string.h>
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/io/write.intern.h"
|
||||
@ -112,6 +114,37 @@ ioWrite(IoWrite *this, const Buffer *buffer)
|
||||
FUNCTION_DEBUG_RESULT_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Write linefeed-terminated string
|
||||
***********************************************************************************************************************************/
|
||||
void
|
||||
ioWriteLine(IoWrite *this, const String *string)
|
||||
{
|
||||
FUNCTION_DEBUG_BEGIN(logLevelTrace);
|
||||
FUNCTION_DEBUG_PARAM(IO_WRITE, this);
|
||||
FUNCTION_DEBUG_PARAM(STRING, string);
|
||||
|
||||
FUNCTION_TEST_ASSERT(this != NULL);
|
||||
FUNCTION_TEST_ASSERT(string != NULL);
|
||||
FUNCTION_TEST_ASSERT(this->opened && !this->closed);
|
||||
FUNCTION_DEBUG_END();
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Load a buffer with the linefeed-terminated string
|
||||
Buffer *buffer = bufNew(strSize(string) + 1);
|
||||
memcpy(bufPtr(buffer), strPtr(string), strSize(string));
|
||||
bufPtr(buffer)[strSize(string)] = '\n';
|
||||
bufUsedSet(buffer, bufSize(buffer));
|
||||
|
||||
// Write the string
|
||||
ioWrite(this, buffer);
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END()
|
||||
|
||||
FUNCTION_DEBUG_RESULT_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Close the IO and write any additional data that has not been written yet
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -21,6 +21,7 @@ Functions
|
||||
***********************************************************************************************************************************/
|
||||
void ioWriteOpen(IoWrite *this);
|
||||
void ioWrite(IoWrite *this, const Buffer *buffer);
|
||||
void ioWriteLine(IoWrite *this, const String *string);
|
||||
void ioWriteClose(IoWrite *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
|
@ -122,7 +122,7 @@ storageDriverPosixFileRead(StorageDriverPosixFileRead *this, Buffer *buffer)
|
||||
size_t expectedBytes = bufRemains(buffer);
|
||||
actualBytes = read(this->handle, bufRemainsPtr(buffer), expectedBytes);
|
||||
|
||||
// Error occurred during write
|
||||
// Error occurred during read
|
||||
if (actualBytes == -1)
|
||||
THROW_SYS_ERROR_FMT(FileReadError, "unable to read '%s'", strPtr(this->name));
|
||||
|
||||
|
@ -315,6 +315,46 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_VOID(ioFilterGroupFree(filterGroup), " free filter group object");
|
||||
TEST_RESULT_VOID(ioFilterGroupFree(NULL), " free NULL filter group object");
|
||||
|
||||
// Mixed line and buffer read
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
ioBufferSizeSet(5);
|
||||
read = ioBufferReadIo(ioBufferReadNew(bufNewZ("AAA123\n1234\n\n12\nBDDDEFF")));
|
||||
ioReadOpen(read);
|
||||
buffer = bufNew(3);
|
||||
|
||||
// Start with a buffer read
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 3, "read buffer");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AAA", " check buffer");
|
||||
|
||||
// Do line reads of various lengths
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), "123", "read line");
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), "1234", "read line");
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), "", "read line");
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), "12", "read line");
|
||||
|
||||
// Read what was left in the line buffer
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 0, "read buffer");
|
||||
bufUsedSet(buffer, 2);
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 1, "read buffer");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AAB", " check buffer");
|
||||
bufUsedSet(buffer, 0);
|
||||
|
||||
// Now do a full buffer read from the input
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 3, "read buffer");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "DDD", " check buffer");
|
||||
|
||||
// Read line doesn't work without a linefeed
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), NULL, "read line");
|
||||
|
||||
// But those bytes can be picked up by a buffer read
|
||||
bufUsedSet(buffer, 0);
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 3, "read buffer");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "EFF", " check buffer");
|
||||
|
||||
// Nothing left to read
|
||||
TEST_RESULT_STR(strPtr(ioReadLine(read)), NULL, "read line");
|
||||
TEST_RESULT_INT(ioRead(read, buffer), 0, "read buffer");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
@ -359,16 +399,16 @@ testRun(void)
|
||||
TEST_RESULT_VOID(ioWriteFilterGroupSet(ioBufferWriteIo(bufferWrite), filterGroup), " add filter group to write io");
|
||||
|
||||
TEST_RESULT_VOID(ioWriteOpen(ioBufferWriteIo(bufferWrite)), " open buffer write object");
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), bufNewZ("ABC")), " write 3 bytes");
|
||||
TEST_RESULT_VOID(ioWriteLine(ioBufferWriteIo(bufferWrite), strNew("AB")), " write string");
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), bufNew(0)), " write 0 bytes");
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), NULL), " write 0 bytes");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABBCC", " check write");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\n", " check write");
|
||||
|
||||
TEST_RESULT_VOID(ioWrite(ioBufferWriteIo(bufferWrite), bufNewZ("12345")), " write 4 bytes");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABBCC112233445", " check write");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\n112233445", " check write");
|
||||
|
||||
TEST_RESULT_VOID(ioWriteClose(ioBufferWriteIo(bufferWrite)), " close buffer write object");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABBCC1122334455XXX", " check write after close");
|
||||
TEST_RESULT_STR(strPtr(strNewBuf(buffer)), "AABB\n\n1122334455XXX", " check write after close");
|
||||
|
||||
TEST_RESULT_PTR(ioWriteFilterGroup(ioBufferWriteIo(bufferWrite)), filterGroup, " check filter group");
|
||||
TEST_RESULT_UINT(
|
||||
|
Loading…
Reference in New Issue
Block a user