1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Improve multi-host handling in protocol helper.

Multiple PostgreSQL hosts were supported via the host-id option but there are cases where it is useful to be able to directly specify the host id required, e.g. to iterate through pg* hosts when looking for candidate primaries and standbys during backup.
This commit is contained in:
David Steele 2019-07-31 20:44:49 -04:00
parent 893ae24284
commit 89c67287bc
7 changed files with 55 additions and 49 deletions

View File

@ -244,7 +244,7 @@ protocolClientExecute(ProtocolClient *this, const ProtocolCommand *command, bool
}
/***********************************************************************************************************************************
Move the file object to a new context
Move the protocol client object to a new context
***********************************************************************************************************************************/
ProtocolClient *
protocolClientMove(ProtocolClient *this, MemContext *parentNew)

View File

@ -72,20 +72,13 @@ repoIsLocal(void)
Is pg local?
***********************************************************************************************************************************/
bool
pgIsLocal(void)
pgIsLocal(unsigned int hostId)
{
FUNCTION_TEST_VOID();
FUNCTION_TEST_RETURN(!cfgOptionTest(cfgOptPgHost + (cfgOptionTest(cfgOptHostId) ? cfgOptionUInt(cfgOptHostId) - 1 : 0)));
}
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(UINT, hostId);
FUNCTION_LOG_END();
/***********************************************************************************************************************************
Get host id if host-id option is set, otherwise return default id 1
***********************************************************************************************************************************/
unsigned int
protocolHostId(void)
{
FUNCTION_TEST_VOID();
FUNCTION_TEST_RETURN(cfgOptionTest(cfgOptHostId) ? cfgOptionUInt(cfgOptHostId) : 1);
FUNCTION_LOG_RETURN(BOOL, !cfgOptionTest(cfgOptPgHost + hostId - 1));
}
/***********************************************************************************************************************************
@ -191,19 +184,17 @@ protocolLocalGet(ProtocolStorageType protocolStorageType, unsigned int protocolI
Get the command line required for remote protocol execution
***********************************************************************************************************************************/
static StringList *
protocolRemoteParam(ProtocolStorageType protocolStorageType, unsigned int protocolId)
protocolRemoteParam(ProtocolStorageType protocolStorageType, unsigned int protocolId, unsigned int hostIdx)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(ENUM, protocolStorageType);
FUNCTION_LOG_PARAM(UINT, protocolId);
FUNCTION_LOG_PARAM(UINT, hostIdx);
FUNCTION_LOG_END();
// Is this a repo remote?
bool isRepo = protocolStorageType == protocolStorageTypeRepo;
// Get the host index. Default to 0 if host-id is not set.
unsigned int hostIdx = isRepo ? 0 : protocolHostId() - 1;
// Fixed parameters for ssh command
StringList *result = strLstNew();
strLstAddZ(result, "-o");
@ -306,18 +297,16 @@ protocolRemoteParam(ProtocolStorageType protocolStorageType, unsigned int protoc
Get the remote protocol client
***********************************************************************************************************************************/
ProtocolClient *
protocolRemoteGet(ProtocolStorageType protocolStorageType)
protocolRemoteGet(ProtocolStorageType protocolStorageType, unsigned int hostId)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(ENUM, protocolStorageType);
FUNCTION_LOG_PARAM(UINT, hostId);
FUNCTION_LOG_END();
// Is this a repo remote?
bool isRepo = protocolStorageType == protocolStorageTypeRepo;
// Get the host index. Default to 0 if host-id is not set.
unsigned int hostIdx = isRepo ? 0 : protocolHostId() - 1;
protocolHelperInit();
// Allocate the client cache
@ -337,11 +326,13 @@ protocolRemoteGet(ProtocolStorageType protocolStorageType)
MEM_CONTEXT_END();
}
// Determine protocol id for the remote. If the process option is set then use that since we want to remote protocol id to
// match the local protocol id (but we'll still save it in position 0 or we'd need to allocated up to process-max slots).
// Otherwise set to 0 since the remote is being started from a main process.
// Determine protocol id for the remote. If the process option is set then use that since we want the remote protocol id to
// match the local protocol id. Otherwise set to 0 since the remote is being started from a main process and there should only
// be one remote per host.
unsigned int protocolId = 0;
unsigned int protocolIdx = 0;
// Use hostId to determine where to cache to remote
unsigned int protocolIdx = hostId - 1;
if (cfgOptionTest(cfgOptProcess))
protocolId = cfgOptionUInt(cfgOptProcess);
@ -355,11 +346,11 @@ protocolRemoteGet(ProtocolStorageType protocolStorageType)
{
MEM_CONTEXT_BEGIN(protocolHelper.memContext)
{
unsigned int optHost = isRepo ? cfgOptRepoHost : cfgOptPgHost + hostIdx;
unsigned int optHost = isRepo ? cfgOptRepoHost : cfgOptPgHost + hostId - 1;
// Execute the protocol command
protocolHelperClient->exec = execNew(
cfgOptionStr(cfgOptCmdSsh), protocolRemoteParam(protocolStorageType, protocolId),
cfgOptionStr(cfgOptCmdSsh), protocolRemoteParam(protocolStorageType, protocolId, hostId - 1),
strNewFmt(PROTOCOL_SERVICE_REMOTE "-%u process on '%s'", protocolId, strPtr(cfgOptionStr(optHost))),
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * 1000));
execOpen(protocolHelperClient->exec);

View File

@ -28,13 +28,12 @@ Functions
***********************************************************************************************************************************/
void protocolKeepAlive(void);
ProtocolClient *protocolLocalGet(ProtocolStorageType protocolStorageType, unsigned int protocolId);
ProtocolClient *protocolRemoteGet(ProtocolStorageType protocolStorageType);
ProtocolClient *protocolRemoteGet(ProtocolStorageType protocolStorageType, unsigned int hostId);
/***********************************************************************************************************************************
Getters
***********************************************************************************************************************************/
bool pgIsLocal(void);
unsigned int protocolHostId(void);
bool pgIsLocal(unsigned int hostId);
bool repoIsLocal(void);
/***********************************************************************************************************************************

View File

@ -157,8 +157,13 @@ protocolServerProcess(ProtocolServer *this)
// Get the next handler
ProtocolServerProcessHandler handler = *(ProtocolServerProcessHandler *)lstGet(this->handlerList, handlerIdx);
// Send the command to the handler
found = handler(command, paramList, this);
// Send the command to the handler. Run the handler in the server's memory context in case any persistent data
// needs to be stored by the handler.
MEM_CONTEXT_BEGIN(this->memContext)
{
found = handler(command, paramList, this);
}
MEM_CONTEXT_END();
// If the handler processed the command then exit the handler loop
if (found)

View File

@ -150,18 +150,21 @@ storagePgGet(bool write)
Storage *result = NULL;
// Determine which host to use
unsigned int hostId = cfgOptionTest(cfgOptHostId) ? cfgOptionUInt(cfgOptHostId) : 1;
// Use remote storage
if (!pgIsLocal())
if (!pgIsLocal(hostId))
{
result = storageRemoteNew(
STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, NULL,
protocolRemoteGet(protocolStorageTypePg), cfgOptionUInt(cfgOptCompressLevelNetwork));
protocolRemoteGet(protocolStorageTypePg, hostId), cfgOptionUInt(cfgOptCompressLevelNetwork));
}
// Use Posix storage
else
{
result = storagePosixNew(
cfgOptionStr(cfgOptPgPath + protocolHostId() - 1), STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, NULL);
cfgOptionStr(cfgOptPgPath + hostId - 1), STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, NULL);
}
FUNCTION_TEST_RETURN(result);
@ -304,7 +307,7 @@ storageRepoGet(const String *type, bool write)
{
result = storageRemoteNew(
STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, storageRepoPathExpression,
protocolRemoteGet(protocolStorageTypeRepo), cfgOptionUInt(cfgOptCompressLevelNetwork));
protocolRemoteGet(protocolStorageTypeRepo, 1), cfgOptionUInt(cfgOptCompressLevelNetwork));
}
// Use CIFS storage
else if (strEqZ(type, STORAGE_TYPE_CIFS))

View File

@ -95,7 +95,7 @@ testRun(void)
strLstAddZ(argList, "backup");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_BOOL(pgIsLocal(), true, "pg is local");
TEST_RESULT_BOOL(pgIsLocal(1), true, "pg is local");
// -------------------------------------------------------------------------------------------------------------------------
argList = strLstNew();
@ -110,7 +110,7 @@ testRun(void)
strLstAddZ(argList, "local");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_BOOL(pgIsLocal(), false, "pg is remote");
TEST_RESULT_BOOL(pgIsLocal(7), false, "pg is remote");
}
// *****************************************************************************************************************************
@ -165,7 +165,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 0), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 0, 0), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|repo-host-user@repo-host"
@ -188,7 +188,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 1), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 1, 0), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|-p|444|repo-host-user@repo-host"
@ -210,7 +210,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 66), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypeRepo, 66, 0), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|pgbackrest@repo-host"
@ -229,7 +229,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1, 0), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|postgres@pg1-host"
@ -252,7 +252,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1, 1), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|postgres@pg2-host"
@ -277,7 +277,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1), "|")),
strPtr(strLstJoin(protocolRemoteParam(protocolStorageTypePg, 1, 2), "|")),
strPtr(
strNew(
"-o|LogLevel=error|-o|Compression=no|-o|PasswordAuthentication=no|postgres@pg3-host"
@ -763,8 +763,8 @@ testRun(void)
TEST_RESULT_VOID(protocolFree(), "free protocol objects before anything has been created");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo), "get remote protocol");
TEST_RESULT_PTR(protocolRemoteGet(protocolStorageTypeRepo), client, "get remote cached protocol");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo, 1), "get remote protocol");
TEST_RESULT_PTR(protocolRemoteGet(protocolStorageTypeRepo, 1), client, "get remote cached protocol");
TEST_RESULT_PTR(protocolHelper.clientRemote[0].client, client, "check position in cache");
TEST_RESULT_VOID(protocolKeepAlive(), "keep alive");
TEST_RESULT_VOID(protocolFree(), "free remote protocol objects");
@ -794,7 +794,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_STR(strPtr(cfgOptionStr(cfgOptRepoCipherPass)), "acbd", "check cipher pass before");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo), "get remote protocol");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo, 1), "get remote protocol");
TEST_RESULT_PTR(protocolHelper.clientRemote[0].client, client, "check position in cache");
TEST_RESULT_STR(strPtr(cfgOptionStr(cfgOptRepoCipherPass)), "acbd", "check cipher pass after");
@ -820,7 +820,7 @@ testRun(void)
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_RESULT_PTR(cfgOptionStr(cfgOptRepoCipherPass), NULL, "check cipher pass before");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo), "get remote protocol");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypeRepo, 1), "get remote protocol");
TEST_RESULT_STR(strPtr(cfgOptionStr(cfgOptRepoCipherPass)), "dcba", "check cipher pass after");
TEST_RESULT_VOID(protocolFree(), "free remote protocol objects");
@ -838,7 +838,7 @@ testRun(void)
strLstAddZ(argList, "backup");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypePg), "get remote protocol");
TEST_ASSIGN(client, protocolRemoteGet(protocolStorageTypePg, 1), "get remote protocol");
// Start local protocol
// -------------------------------------------------------------------------------------------------------------------------

View File

@ -1084,6 +1084,7 @@ testRun(void)
strLstAddZ(argList, "--archive-async");
strLstAdd(argList, strNewFmt("--spool-path=%s", testPath()));
strLstAdd(argList, strNewFmt("--pg1-path=%s/db", testPath()));
strLstAdd(argList, strNewFmt("--pg2-path=%s/db2", testPath()));
strLstAddZ(argList, "archive-get");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
@ -1137,6 +1138,13 @@ testRun(void)
TEST_RESULT_STR(strPtr(storage->path), strPtr(strNewFmt("%s/db", testPath())), "check pg write storage path");
TEST_RESULT_BOOL(storage->write, true, "check pg write storage write");
// Pg storage from another host id
// -------------------------------------------------------------------------------------------------------------------------
cfgOptionSet(cfgOptHostId, cfgSourceParam, VARUINT64(2));
cfgOptionValidSet(cfgOptHostId, true);
TEST_RESULT_STR(strPtr(storagePgGet(false)->path), strPtr(strNewFmt("%s/db2", testPath())), "check pg-2 storage path");
// Change the stanza to NULL, stanzaInit flag to false and make sure helper fails because stanza is required
// -------------------------------------------------------------------------------------------------------------------------
storageHelper.storageSpool = NULL;