mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2024-12-14 10:13:05 +02:00
Add state to ProtocolClient.
This is currently only useful for debugging, but in the future the state may be used for resetting the protocol when something goes wrong.
This commit is contained in:
parent
2c65fed80f
commit
f2aeb30fc7
@ -19,12 +19,34 @@ STRING_EXTERN(PROTOCOL_GREETING_NAME_STR, PROTOCOL_GRE
|
|||||||
STRING_EXTERN(PROTOCOL_GREETING_SERVICE_STR, PROTOCOL_GREETING_SERVICE);
|
STRING_EXTERN(PROTOCOL_GREETING_SERVICE_STR, PROTOCOL_GREETING_SERVICE);
|
||||||
STRING_EXTERN(PROTOCOL_GREETING_VERSION_STR, PROTOCOL_GREETING_VERSION);
|
STRING_EXTERN(PROTOCOL_GREETING_VERSION_STR, PROTOCOL_GREETING_VERSION);
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Client state enum
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
// Client is waiting for a command
|
||||||
|
protocolClientStateIdle = STRID5("idle", 0x2b0890),
|
||||||
|
|
||||||
|
// Command put is in progress
|
||||||
|
protocolClientStateCommandPut = STRID5("cmd-put", 0x52b0d91a30),
|
||||||
|
|
||||||
|
// Waiting for command data from server. Only used when dataPut is true in protocolClientCommandPut().
|
||||||
|
protocolClientStateCommandDataGet = STRID5("cmd-data-get", 0xa14fb0d024d91a30),
|
||||||
|
|
||||||
|
// Putting data to server. Only used when dataPut is true in protocolClientCommandPut().
|
||||||
|
protocolClientStateDataPut = STRID5("data-put", 0xa561b0d0240),
|
||||||
|
|
||||||
|
// Getting data from server
|
||||||
|
protocolClientStateDataGet = STRID5("data-get", 0xa14fb0d0240),
|
||||||
|
} ProtocolClientState;
|
||||||
|
|
||||||
/***********************************************************************************************************************************
|
/***********************************************************************************************************************************
|
||||||
Object type
|
Object type
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
struct ProtocolClient
|
struct ProtocolClient
|
||||||
{
|
{
|
||||||
ProtocolClientPub pub; // Publicly accessible variables
|
ProtocolClientPub pub; // Publicly accessible variables
|
||||||
|
ProtocolClientState state; // Current client state
|
||||||
IoWrite *write; // Write interface
|
IoWrite *write; // Write interface
|
||||||
const String *name; // Name displayed in logging
|
const String *name; // Name displayed in logging
|
||||||
const String *errorPrefix; // Prefix used when throwing error
|
const String *errorPrefix; // Prefix used when throwing error
|
||||||
@ -45,10 +67,13 @@ protocolClientFreeResource(THIS_VOID)
|
|||||||
|
|
||||||
ASSERT(this != NULL);
|
ASSERT(this != NULL);
|
||||||
|
|
||||||
|
// Switch state to idle so the command is sent no matter the current state
|
||||||
|
this->state = protocolClientStateIdle;
|
||||||
|
|
||||||
// Send an exit command but don't wait to see if it succeeds
|
// Send an exit command but don't wait to see if it succeeds
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
{
|
{
|
||||||
protocolClientCommandPut(this, protocolCommandNew(PROTOCOL_COMMAND_EXIT));
|
protocolClientCommandPut(this, protocolCommandNew(PROTOCOL_COMMAND_EXIT), false);
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
|
||||||
@ -82,6 +107,7 @@ protocolClientNew(const String *name, const String *service, IoRead *read, IoWri
|
|||||||
{
|
{
|
||||||
.read = read,
|
.read = read,
|
||||||
},
|
},
|
||||||
|
.state = protocolClientStateIdle,
|
||||||
.write = write,
|
.write = write,
|
||||||
.name = strDup(name),
|
.name = strDup(name),
|
||||||
.errorPrefix = strNewFmt("raised from %s", strZ(name)),
|
.errorPrefix = strNewFmt("raised from %s", strZ(name)),
|
||||||
@ -134,6 +160,23 @@ protocolClientNew(const String *name, const String *service, IoRead *read, IoWri
|
|||||||
FUNCTION_LOG_RETURN(PROTOCOL_CLIENT, this);
|
FUNCTION_LOG_RETURN(PROTOCOL_CLIENT, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Check protocol state
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
static void
|
||||||
|
protocolClientStateExpect(const ProtocolClient *const this, const ProtocolClientState expect)
|
||||||
|
{
|
||||||
|
FUNCTION_TEST_BEGIN();
|
||||||
|
FUNCTION_TEST_PARAM(PROTOCOL_CLIENT, this);
|
||||||
|
FUNCTION_TEST_PARAM(STRING_ID, expect);
|
||||||
|
FUNCTION_TEST_END();
|
||||||
|
|
||||||
|
if (this->state != expect)
|
||||||
|
THROW_FMT(ProtocolError, "client state is '%s' but expected '%s'", strZ(strIdToStr(this->state)), strZ(strIdToStr(expect)));
|
||||||
|
|
||||||
|
FUNCTION_TEST_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
/**********************************************************************************************************************************/
|
/**********************************************************************************************************************************/
|
||||||
void
|
void
|
||||||
protocolClientDataPut(ProtocolClient *const this, PackWrite *const data)
|
protocolClientDataPut(ProtocolClient *const this, PackWrite *const data)
|
||||||
@ -145,6 +188,9 @@ protocolClientDataPut(ProtocolClient *const this, PackWrite *const data)
|
|||||||
|
|
||||||
ASSERT(this != NULL);
|
ASSERT(this != NULL);
|
||||||
|
|
||||||
|
// Expect data-put state before data put
|
||||||
|
protocolClientStateExpect(this, protocolClientStateDataPut);
|
||||||
|
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
{
|
{
|
||||||
// End the pack
|
// End the pack
|
||||||
@ -159,7 +205,12 @@ protocolClientDataPut(ProtocolClient *const this, PackWrite *const data)
|
|||||||
|
|
||||||
// Flush when there is no more data to put
|
// Flush when there is no more data to put
|
||||||
if (data == NULL)
|
if (data == NULL)
|
||||||
|
{
|
||||||
ioWriteFlush(this->write);
|
ioWriteFlush(this->write);
|
||||||
|
|
||||||
|
// Switch state to data-get after successful data end put
|
||||||
|
this->state = protocolClientStateDataGet;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
|
||||||
@ -189,6 +240,9 @@ protocolClientError(ProtocolClient *const this, const ProtocolMessageType type,
|
|||||||
const String *const stack = pckReadStrP(error);
|
const String *const stack = pckReadStrP(error);
|
||||||
pckReadEndP(error);
|
pckReadEndP(error);
|
||||||
|
|
||||||
|
// Switch state to idle after error (server will do the same)
|
||||||
|
this->state = protocolClientStateIdle;
|
||||||
|
|
||||||
CHECK(message != NULL && stack != NULL);
|
CHECK(message != NULL && stack != NULL);
|
||||||
|
|
||||||
errorInternalThrow(type, __FILE__, __func__, __LINE__, strZ(message), strZ(stack));
|
errorInternalThrow(type, __FILE__, __func__, __LINE__, strZ(message), strZ(stack));
|
||||||
@ -209,6 +263,10 @@ protocolClientDataGet(ProtocolClient *const this)
|
|||||||
|
|
||||||
ASSERT(this != NULL);
|
ASSERT(this != NULL);
|
||||||
|
|
||||||
|
// Expect data-get state before data get
|
||||||
|
protocolClientStateExpect(
|
||||||
|
this, this->state == protocolClientStateCommandDataGet ? protocolClientStateCommandDataGet : protocolClientStateDataGet);
|
||||||
|
|
||||||
PackRead *result = NULL;
|
PackRead *result = NULL;
|
||||||
|
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
@ -227,6 +285,10 @@ protocolClientDataGet(ProtocolClient *const this)
|
|||||||
MEM_CONTEXT_PRIOR_END();
|
MEM_CONTEXT_PRIOR_END();
|
||||||
|
|
||||||
pckReadEndP(response);
|
pckReadEndP(response);
|
||||||
|
|
||||||
|
// Switch state to data-put after successful command data get
|
||||||
|
if (this->state == protocolClientStateCommandDataGet)
|
||||||
|
this->state = protocolClientStateDataPut;
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
|
||||||
@ -243,6 +305,9 @@ protocolClientDataEndGet(ProtocolClient *const this)
|
|||||||
|
|
||||||
ASSERT(this != NULL);
|
ASSERT(this != NULL);
|
||||||
|
|
||||||
|
// Expect data-get state before data end get
|
||||||
|
protocolClientStateExpect(this, protocolClientStateDataGet);
|
||||||
|
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
{
|
{
|
||||||
PackRead *response = pckReadNewIo(this->pub.read);
|
PackRead *response = pckReadNewIo(this->pub.read);
|
||||||
@ -253,6 +318,9 @@ protocolClientDataEndGet(ProtocolClient *const this)
|
|||||||
CHECK(type == protocolMessageTypeDataEnd);
|
CHECK(type == protocolMessageTypeDataEnd);
|
||||||
|
|
||||||
pckReadEndP(response);
|
pckReadEndP(response);
|
||||||
|
|
||||||
|
// Switch state to idle after successful data end get
|
||||||
|
this->state = protocolClientStateIdle;
|
||||||
}
|
}
|
||||||
MEM_CONTEXT_TEMP_END();
|
MEM_CONTEXT_TEMP_END();
|
||||||
|
|
||||||
@ -261,19 +329,29 @@ protocolClientDataEndGet(ProtocolClient *const this)
|
|||||||
|
|
||||||
/**********************************************************************************************************************************/
|
/**********************************************************************************************************************************/
|
||||||
void
|
void
|
||||||
protocolClientCommandPut(ProtocolClient *const this, ProtocolCommand *const command)
|
protocolClientCommandPut(ProtocolClient *const this, ProtocolCommand *const command, const bool dataPut)
|
||||||
{
|
{
|
||||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||||
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, this);
|
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, this);
|
||||||
FUNCTION_LOG_PARAM(PROTOCOL_COMMAND, command);
|
FUNCTION_LOG_PARAM(PROTOCOL_COMMAND, command);
|
||||||
|
FUNCTION_LOG_PARAM(BOOL, dataPut);
|
||||||
FUNCTION_LOG_END();
|
FUNCTION_LOG_END();
|
||||||
|
|
||||||
ASSERT(this != NULL);
|
ASSERT(this != NULL);
|
||||||
ASSERT(command != NULL);
|
ASSERT(command != NULL);
|
||||||
|
|
||||||
|
// Expect idle state before command put
|
||||||
|
protocolClientStateExpect(this, protocolClientStateIdle);
|
||||||
|
|
||||||
|
// Switch state to cmd-put
|
||||||
|
this->state = protocolClientStateDataPut;
|
||||||
|
|
||||||
// Put command
|
// Put command
|
||||||
protocolCommandPut(command, this->write);
|
protocolCommandPut(command, this->write);
|
||||||
|
|
||||||
|
// Switch state to data-get/data-put after successful command put
|
||||||
|
this->state = dataPut ? protocolClientStateCommandDataGet : protocolClientStateDataGet;
|
||||||
|
|
||||||
// Reset the keep alive time
|
// Reset the keep alive time
|
||||||
this->keepAliveTime = timeMSec();
|
this->keepAliveTime = timeMSec();
|
||||||
|
|
||||||
@ -294,7 +372,7 @@ protocolClientExecute(ProtocolClient *const this, ProtocolCommand *const command
|
|||||||
ASSERT(command != NULL);
|
ASSERT(command != NULL);
|
||||||
|
|
||||||
// Put command
|
// Put command
|
||||||
protocolClientCommandPut(this, command);
|
protocolClientCommandPut(this, command, false);
|
||||||
|
|
||||||
// Read result if required
|
// Read result if required
|
||||||
PackRead *result = NULL;
|
PackRead *result = NULL;
|
||||||
@ -331,5 +409,5 @@ protocolClientNoOp(ProtocolClient *this)
|
|||||||
String *
|
String *
|
||||||
protocolClientToLog(const ProtocolClient *this)
|
protocolClientToLog(const ProtocolClient *this)
|
||||||
{
|
{
|
||||||
return strNewFmt("{name: %s}", strZ(this->name));
|
return strNewFmt("{name: %s, state: %s}", strZ(this->name), strZ(strIdToStr(this->state)));
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ PackRead *protocolClientDataGet(ProtocolClient *this);
|
|||||||
void protocolClientDataEndGet(ProtocolClient *this);
|
void protocolClientDataEndGet(ProtocolClient *this);
|
||||||
|
|
||||||
// Put command to the server
|
// Put command to the server
|
||||||
void protocolClientCommandPut(ProtocolClient *this, ProtocolCommand *command);
|
void protocolClientCommandPut(ProtocolClient *this, ProtocolCommand *command, const bool dataPut);
|
||||||
|
|
||||||
// Put data to the server
|
// Put data to the server
|
||||||
void protocolClientDataPut(ProtocolClient *this, PackWrite *data);
|
void protocolClientDataPut(ProtocolClient *this, PackWrite *data);
|
||||||
|
@ -209,7 +209,7 @@ protocolParallelProcess(ProtocolParallel *this)
|
|||||||
|
|
||||||
// Put command
|
// Put command
|
||||||
protocolClientCommandPut(
|
protocolClientCommandPut(
|
||||||
*(ProtocolClient **)lstGet(this->clientList, clientIdx), protocolParallelJobCommand(job));
|
*(ProtocolClient **)lstGet(this->clientList, clientIdx), protocolParallelJobCommand(job), false);
|
||||||
|
|
||||||
// Set client id and running state
|
// Set client id and running state
|
||||||
protocolParallelJobProcessIdSet(job, clientIdx + 1);
|
protocolParallelJobProcessIdSet(job, clientIdx + 1);
|
||||||
|
@ -77,7 +77,7 @@ storageReadRemoteOpen(THIS_VOID)
|
|||||||
pckWriteStrP(param, jsonFromVar(this->interface.limit));
|
pckWriteStrP(param, jsonFromVar(this->interface.limit));
|
||||||
pckWritePackP(param, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read))));
|
pckWritePackP(param, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read))));
|
||||||
|
|
||||||
protocolClientCommandPut(this->client, command);
|
protocolClientCommandPut(this->client, command, false);
|
||||||
|
|
||||||
// If the file exists
|
// If the file exists
|
||||||
result = pckReadBoolP(protocolClientDataGet(this->client));
|
result = pckReadBoolP(protocolClientDataGet(this->client));
|
||||||
|
@ -142,7 +142,7 @@ storageRemoteInfo(THIS_VOID, const String *file, StorageInfoLevel level, Storage
|
|||||||
pckWriteBoolP(commandParam, param.followLink);
|
pckWriteBoolP(commandParam, param.followLink);
|
||||||
|
|
||||||
// Put command
|
// Put command
|
||||||
protocolClientCommandPut(this->client, command);
|
protocolClientCommandPut(this->client, command, false);
|
||||||
|
|
||||||
// Read info from protocol
|
// Read info from protocol
|
||||||
PackRead *read = protocolClientDataGet(this->client);
|
PackRead *read = protocolClientDataGet(this->client);
|
||||||
@ -200,7 +200,7 @@ storageRemoteInfoList(
|
|||||||
pckWriteU32P(commandParam, level);
|
pckWriteU32P(commandParam, level);
|
||||||
|
|
||||||
// Put command
|
// Put command
|
||||||
protocolClientCommandPut(this->client, command);
|
protocolClientCommandPut(this->client, command, false);
|
||||||
|
|
||||||
// Read list
|
// Read list
|
||||||
StorageRemoteInfoData parseData = {.memContext = memContextCurrent()};
|
StorageRemoteInfoData parseData = {.memContext = memContextCurrent()};
|
||||||
|
@ -93,7 +93,7 @@ storageWriteRemoteOpen(THIS_VOID)
|
|||||||
pckWriteBoolP(param, this->interface.atomic);
|
pckWriteBoolP(param, this->interface.atomic);
|
||||||
pckWritePackP(param, ioFilterGroupParamAll(ioWriteFilterGroup(storageWriteIo(this->write))));
|
pckWritePackP(param, ioFilterGroupParamAll(ioWriteFilterGroup(storageWriteIo(this->write))));
|
||||||
|
|
||||||
protocolClientCommandPut(this->client, command);
|
protocolClientCommandPut(this->client, command, true);
|
||||||
protocolClientDataGet(this->client);
|
protocolClientDataGet(this->client);
|
||||||
|
|
||||||
// Clear filters since they will be run on the remote side
|
// Clear filters since they will be run on the remote side
|
||||||
|
@ -281,7 +281,7 @@ testRun(void)
|
|||||||
OBJ_NEW_BEGIN(ProtocolClient)
|
OBJ_NEW_BEGIN(ProtocolClient)
|
||||||
{
|
{
|
||||||
protocolHelperClient.client = OBJ_NEW_ALLOC();
|
protocolHelperClient.client = OBJ_NEW_ALLOC();
|
||||||
*protocolHelperClient.client = (ProtocolClient){.name = STRDEF("test")};
|
*protocolHelperClient.client = (ProtocolClient){.name = STRDEF("test"), .state = protocolClientStateIdle};
|
||||||
memContextCallbackSet(memContextCurrent(), protocolClientFreeResource, protocolHelperClient.client);
|
memContextCallbackSet(memContextCurrent(), protocolClientFreeResource, protocolHelperClient.client);
|
||||||
}
|
}
|
||||||
OBJ_NEW_END();
|
OBJ_NEW_END();
|
||||||
@ -635,7 +635,11 @@ testRun(void)
|
|||||||
TEST_ASSIGN(command, protocolCommandNew(TEST_PROTOCOL_COMMAND_COMPLEX), "command");
|
TEST_ASSIGN(command, protocolCommandNew(TEST_PROTOCOL_COMMAND_COMPLEX), "command");
|
||||||
TEST_RESULT_VOID(pckWriteU32P(protocolCommandParam(command), 87), "param");
|
TEST_RESULT_VOID(pckWriteU32P(protocolCommandParam(command), 87), "param");
|
||||||
TEST_RESULT_VOID(pckWriteStrP(protocolCommandParam(command), STRDEF("data")), "param");
|
TEST_RESULT_VOID(pckWriteStrP(protocolCommandParam(command), STRDEF("data")), "param");
|
||||||
TEST_RESULT_VOID(protocolClientCommandPut(client, command), "command put");
|
TEST_RESULT_VOID(protocolClientCommandPut(client, command, true), "command put");
|
||||||
|
|
||||||
|
TEST_ERROR(
|
||||||
|
protocolClientStateExpect(client, protocolClientStateIdle), ProtocolError,
|
||||||
|
"client state is 'cmd-data-get' but expected 'idle'");
|
||||||
|
|
||||||
// Read null data to indicate that the server has started the command and is read to receive data
|
// Read null data to indicate that the server has started the command and is read to receive data
|
||||||
TEST_RESULT_PTR(protocolClientDataGet(client), NULL, "command started and ready for data");
|
TEST_RESULT_PTR(protocolClientDataGet(client), NULL, "command started and ready for data");
|
||||||
@ -799,7 +803,12 @@ testRun(void)
|
|||||||
TEST_TITLE("error on add without an fd");
|
TEST_TITLE("error on add without an fd");
|
||||||
|
|
||||||
// Fake a client without a read fd
|
// Fake a client without a read fd
|
||||||
ProtocolClient clientError = {.pub = {.read = ioBufferReadNew(bufNew(0))}, .name = STRDEF("test")};
|
ProtocolClient clientError =
|
||||||
|
{
|
||||||
|
.pub = {.read = ioBufferReadNew(bufNew(0))},
|
||||||
|
.name = STRDEF("test"),
|
||||||
|
.state = protocolClientStateIdle,
|
||||||
|
};
|
||||||
|
|
||||||
TEST_ERROR(protocolParallelClientAdd(parallel, &clientError), AssertError, "client with read fd is required");
|
TEST_ERROR(protocolParallelClientAdd(parallel, &clientError), AssertError, "client with read fd is required");
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user