1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Add facility for reading and writing adhoc protocol output.

Pushing output through a JSON blob is not practical if the output is extremely large, e.g. a backup manifest with 100K+ files.

Add read/write routines so that output can be returned in chunks but errors will still be detected.
This commit is contained in:
David Steele 2019-11-16 17:05:34 -05:00
parent 90e19d99ba
commit 6827a13f3a
7 changed files with 172 additions and 28 deletions

View File

@ -19,6 +19,7 @@ BUFFER_STRDEF_EXTERN(BRACKETL_BUF, "[");
BUFFER_STRDEF_EXTERN(BRACKETR_BUF, "]");
BUFFER_STRDEF_EXTERN(COMMA_BUF, ",");
BUFFER_STRDEF_EXTERN(CR_BUF, "\r");
BUFFER_STRDEF_EXTERN(DOT_BUF, ".");
BUFFER_STRDEF_EXTERN(EQ_BUF, "=");
BUFFER_STRDEF_EXTERN(LF_BUF, "\n");
BUFFER_STRDEF_EXTERN(QUOTED_BUF, "\"");

View File

@ -114,6 +114,7 @@ BUFFER_DECLARE(BRACKETL_BUF);
BUFFER_DECLARE(BRACKETR_BUF);
BUFFER_DECLARE(COMMA_BUF);
BUFFER_DECLARE(CR_BUF);
BUFFER_DECLARE(DOT_BUF);
BUFFER_DECLARE(EQ_BUF);
BUFFER_DECLARE(LF_BUF);
BUFFER_DECLARE(QUOTED_BUF);

View File

@ -138,6 +138,49 @@ protocolClientNew(const String *name, const String *service, IoRead *read, IoWri
/***********************************************************************************************************************************
Read the command output
***********************************************************************************************************************************/
// Helper to process errors
static void
protocolClientProcessError(ProtocolClient *this, KeyValue *errorKv)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, this);
FUNCTION_LOG_PARAM(KEY_VALUE, errorKv);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(errorKv != NULL);
MEM_CONTEXT_TEMP_BEGIN()
{
// Process error if any
const Variant *error = kvGet(errorKv, VARSTR(PROTOCOL_ERROR_STR));
if (error != NULL)
{
const ErrorType *type = errorTypeFromCode(varIntForce(error));
const String *message = varStr(kvGet(errorKv, VARSTR(PROTOCOL_OUTPUT_STR)));
// Required part of the message
String *throwMessage = strNewFmt(
"%s: %s", strPtr(this->errorPrefix), message == NULL ? "no details available" : strPtr(message));
// Add stack trace if the error is an assertion or debug-level logging is enabled
if (type == &AssertError || logAny(logLevelDebug))
{
const String *stack = varStr(kvGet(errorKv, VARSTR(PROTOCOL_ERROR_STACK_STR)));
strCat(throwMessage, "\n");
strCat(throwMessage, stack == NULL ? "no stack trace available" : strPtr(stack));
}
THROWP(type, strPtr(throwMessage));
}
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN_VOID();
}
const Variant *
protocolClientReadOutput(ProtocolClient *this, bool outputRequired)
{
@ -157,28 +200,7 @@ protocolClientReadOutput(ProtocolClient *this, bool outputRequired)
KeyValue *responseKv = varKv(jsonToVar(response));
// Process error if any
const Variant *error = kvGet(responseKv, VARSTR(PROTOCOL_ERROR_STR));
if (error != NULL)
{
const ErrorType *type = errorTypeFromCode(varIntForce(error));
const String *message = varStr(kvGet(responseKv, VARSTR(PROTOCOL_OUTPUT_STR)));
// Required part of the message
String *throwMessage = strNewFmt(
"%s: %s", strPtr(this->errorPrefix), message == NULL ? "no details available" : strPtr(message));
// Add stack trace if the error is an assertion or debug-level logging is enabled
if (type == &AssertError || logAny(logLevelDebug))
{
const String *stack = varStr(kvGet(responseKv, VARSTR(PROTOCOL_ERROR_STACK_STR)));
strCat(throwMessage, "\n");
strCat(throwMessage, stack == NULL ? "no stack trace available" : strPtr(stack));
}
THROWP(type, strPtr(throwMessage));
}
protocolClientProcessError(this, responseKv);
// Get output
result = kvGet(responseKv, VARSTR(PROTOCOL_OUTPUT_STR));
@ -265,6 +287,50 @@ protocolClientNoOp(ProtocolClient *this)
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Read a line
***********************************************************************************************************************************/
String *
protocolClientReadLine(ProtocolClient *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
String *result = NULL;
MEM_CONTEXT_TEMP_BEGIN()
{
result = ioReadLine(this->read);
if (strSize(result) == 0)
{
THROW(FormatError, "unexpected empty line");
}
else if (strPtr(result)[0] == '{')
{
KeyValue *responseKv = varKv(jsonToVar(result));
// Process expected error
protocolClientProcessError(this, responseKv);
// If not an error then there is probably a protocol bug
THROW(FormatError, "expected error but got output");
}
else if (strPtr(result)[0] != '.')
THROW_FMT(FormatError, "invalid prefix in '%s'", strPtr(result));
memContextSwitch(MEM_CONTEXT_OLD());
result = strSub(result, 1);
memContextSwitch(MEM_CONTEXT_TEMP());
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(STRING, result);
}
/***********************************************************************************************************************************
Get read interface
***********************************************************************************************************************************/

View File

@ -50,6 +50,7 @@ Functions
const Variant *protocolClientExecute(ProtocolClient *this, const ProtocolCommand *command, bool outputRequired);
ProtocolClient *protocolClientMove(ProtocolClient *this, MemContext *parentNew);
void protocolClientNoOp(ProtocolClient *this);
String *protocolClientReadLine(ProtocolClient *this);
const Variant *protocolClientReadOutput(ProtocolClient *this, bool outputRequired);
void protocolClientWriteCommand(ProtocolClient *this, const ProtocolCommand *command);

View File

@ -223,6 +223,32 @@ protocolServerResponse(ProtocolServer *this, const Variant *output)
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Write a line
***********************************************************************************************************************************/
void
protocolServerWriteLine(const ProtocolServer *this, const String *line)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
FUNCTION_LOG_PARAM(STRING, line);
FUNCTION_LOG_END();
ASSERT(this != NULL);
// Dot indicates the start of an lf-terminated line
ioWrite(this->write, DOT_BUF);
// Write the line if it exists
if (line != NULL)
ioWriteStr(this->write, line);
// Terminate with a linefeed
ioWrite(this->write, LF_BUF);
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Get read interface
***********************************************************************************************************************************/

View File

@ -33,6 +33,7 @@ void protocolServerProcess(ProtocolServer *this);
void protocolServerResponse(ProtocolServer *this, const Variant *output);
void protocolServerHandlerAdd(ProtocolServer *this, ProtocolServerProcessHandler handler);
ProtocolServer *protocolServerMove(ProtocolServer *this, MemContext *parentNew);
void protocolServerWriteLine(const ProtocolServer *this, const String *line);
/***********************************************************************************************************************************
Getters

View File

@ -43,7 +43,8 @@ testServerProtocol(const String *command, const VariantList *paramList, Protocol
else if (strEq(command, strNew("request-complex")))
{
protocolServerResponse(server, varNewBool(false));
ioWriteStrLine(protocolServerIoWrite(server), strNew("LINEOFTEXT"));
protocolServerWriteLine(server, strNew("LINEOFTEXT"));
protocolServerWriteLine(server, NULL);
ioWriteFlush(protocolServerIoWrite(server));
}
else
@ -403,9 +404,31 @@ testRun(void)
// Send output
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"cmd\":\"test\"}", "test command");
ioWriteStrLine(write, strNew(".OUTPUT"));
ioWriteStrLine(write, strNew("{\"out\":[\"value1\",\"value2\"]}"));
ioWriteFlush(write);
// invalid line
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"cmd\":\"invalid-line\"}", "invalid line command");
ioWrite(write, LF_BUF);
ioWriteFlush(write);
// error instead of output
TEST_RESULT_STR(
strPtr(ioReadLine(read)), "{\"cmd\":\"error-instead-of-output\"}", "error instead of output command");
ioWriteStrLine(write, strNew("{\"err\":255}"));
ioWriteFlush(write);
// unexpected output
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"cmd\":\"unexpected-output\"}", "unexpected output");
ioWriteStrLine(write, strNew("{}"));
ioWriteFlush(write);
// invalid prefix
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"cmd\":\"invalid-prefix\"}", "invalid prefix");
ioWriteStrLine(write, strNew("~line"));
ioWriteFlush(write);
// Wait for exit
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"cmd\":\"exit\"}", "exit command");
}
@ -472,14 +495,38 @@ testRun(void)
// Get command output
const VariantList *output = NULL;
TEST_ASSIGN(
output,
varVarLst(protocolClientExecute(client, protocolCommandNew(strNew("test")), true)),
"execute command with output");
TEST_RESULT_VOID(
protocolClientWriteCommand(client, protocolCommandNew(strNew("test"))), "execute command with output");
TEST_RESULT_STR_Z(protocolClientReadLine(client), "OUTPUT", "check output");
TEST_ASSIGN(output, varVarLst(protocolClientReadOutput(client, true)), "execute command with output");
TEST_RESULT_UINT(varLstSize(output), 2, "check output size");
TEST_RESULT_STR(strPtr(varStr(varLstGet(output, 0))), "value1", "check value1");
TEST_RESULT_STR(strPtr(varStr(varLstGet(output, 1))), "value2", "check value2");
// Invalid line
TEST_RESULT_VOID(
protocolClientWriteCommand(client, protocolCommandNew(strNew("invalid-line"))),
"execute command that returns invalid line");
TEST_ERROR(protocolClientReadLine(client), FormatError, "unexpected empty line");
// Error instead of output
TEST_RESULT_VOID(
protocolClientWriteCommand(client, protocolCommandNew(strNew("error-instead-of-output"))),
"execute command that returns error instead of output");
TEST_ERROR(protocolClientReadLine(client), UnknownError, "raised from test client: no details available");
// Unexpected output
TEST_RESULT_VOID(
protocolClientWriteCommand(client, protocolCommandNew(strNew("unexpected-output"))),
"execute command that returns unexpected output");
TEST_ERROR(protocolClientReadLine(client), FormatError, "expected error but got output");
// Invalid prefix
TEST_RESULT_VOID(
protocolClientWriteCommand(client, protocolCommandNew(strNew("invalid-prefix"))),
"execute command that returns an invalid prefix");
TEST_ERROR(protocolClientReadLine(client), FormatError, "invalid prefix in '~line'");
// Free client
TEST_RESULT_VOID(protocolClientFree(client), "free client");
}
@ -537,7 +584,8 @@ testRun(void)
TEST_RESULT_VOID(ioWriteStrLine(write, strNew("{\"cmd\":\"request-complex\"}")), "write complex request");
TEST_RESULT_VOID(ioWriteFlush(write), "flush complex request");
TEST_RESULT_STR(strPtr(ioReadLine(read)), "{\"out\":false}", "complex request result");
TEST_RESULT_STR(strPtr(ioReadLine(read)), "LINEOFTEXT", "complex request result");
TEST_RESULT_STR_Z(ioReadLine(read), ".LINEOFTEXT", "complex request result");
TEST_RESULT_STR_Z(ioReadLine(read), ".", "complex request result");
// Exit
TEST_RESULT_VOID(ioWriteStrLine(write, strNew("{\"cmd\":\"exit\"}")), "write exit");