1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-10-30 23:37:45 +02:00

Use a callback to feed jobs to ProtocolParallel.

Loading jobs in advance uses a lot of memory in the case that there are millions of jobs to be performed.  We haven't seen this yet, but with backup and restore on the horizon it will become the norm.

Instead, use a callback so that jobs are only created as they are needed and can be freed as soon as they are completed.
This commit is contained in:
David Steele
2019-09-18 07:15:16 -04:00
parent ce1c7b0252
commit 60d93df503
5 changed files with 191 additions and 91 deletions

View File

@@ -286,6 +286,39 @@ cmdArchiveGet(void)
/***********************************************************************************************************************************
Async version of archive get that runs in parallel for performance
***********************************************************************************************************************************/
typedef struct ArchiveGetAsyncData
{
const StringList *walSegmentList; // List of wal segments to process
unsigned int walSegmentIdx; // Current index in the list to be processed
} ArchiveGetAsyncData;
static ProtocolParallelJob *archiveGetAsyncCallback(void *data, unsigned int clientIdx)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM_P(VOID, data);
FUNCTION_TEST_PARAM(UINT, clientIdx);
FUNCTION_TEST_END();
// No special logic based on the client, we'll just get the next job
(void)clientIdx;
// Get a new job if there are any left
ArchiveGetAsyncData *jobData = data;
if (jobData->walSegmentIdx < strLstSize(jobData->walSegmentList))
{
const String *walSegment = strLstGet(jobData->walSegmentList, jobData->walSegmentIdx);
jobData->walSegmentIdx++;
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_STR);
protocolCommandParamAdd(command, VARSTR(walSegment));
FUNCTION_TEST_RETURN(protocolParallelJobNew(VARSTR(walSegment), command));
}
FUNCTION_TEST_RETURN(NULL);
}
void
cmdArchiveGetAsync(void)
{
@@ -296,34 +329,25 @@ cmdArchiveGetAsync(void)
TRY_BEGIN()
{
// Check the parameters
const StringList *walSegmentList = cfgCommandParam();
ArchiveGetAsyncData jobData = {.walSegmentList = cfgCommandParam()};
if (strLstSize(walSegmentList) < 1)
if (strLstSize(jobData.walSegmentList) < 1)
THROW(ParamInvalidError, "at least one wal segment is required");
LOG_INFO(
"get %u WAL file(s) from archive: %s%s", strLstSize(walSegmentList), strPtr(strLstGet(walSegmentList, 0)),
strLstSize(walSegmentList) == 1 ?
"" : strPtr(strNewFmt("...%s", strPtr(strLstGet(walSegmentList, strLstSize(walSegmentList) - 1)))));
"get %u WAL file(s) from archive: %s%s",
strLstSize(jobData.walSegmentList), strPtr(strLstGet(jobData.walSegmentList, 0)),
strLstSize(jobData.walSegmentList) == 1 ?
"" :
strPtr(strNewFmt("...%s", strPtr(strLstGet(jobData.walSegmentList, strLstSize(jobData.walSegmentList) - 1)))));
// Create the parallel executor
ProtocolParallel *parallelExec = protocolParallelNew(
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2);
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2, archiveGetAsyncCallback, &jobData);
for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, processIdx));
// Queue jobs in executor
for (unsigned int walSegmentIdx = 0; walSegmentIdx < strLstSize(walSegmentList); walSegmentIdx++)
{
const String *walSegment = strLstGet(walSegmentList, walSegmentIdx);
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_STR);
protocolCommandParamAdd(command, VARSTR(walSegment));
protocolParallelJobAdd(parallelExec, protocolParallelJobNew(VARSTR(walSegment), command));
}
// Process jobs
do
{
@@ -362,6 +386,8 @@ cmdArchiveGetAsync(void)
archiveAsyncStatusErrorWrite(
archiveModeGet, walSegment, protocolParallelJobErrorCode(job), protocolParallelJobErrorMessage(job));
}
protocolParallelJobFree(job);
}
}
while (!protocolParallelDone(parallelExec));

View File

@@ -383,6 +383,52 @@ cmdArchivePush(void)
/***********************************************************************************************************************************
Async version of archive push that runs in parallel for performance
***********************************************************************************************************************************/
typedef struct ArchivePushAsyncData
{
const String *walPath; // Path to pg_wal/pg_xlog
const StringList *walFileList; // List of wal files to process
unsigned int walFileIdx; // Current index in the list to be processed
CipherType cipherType; // Cipher type
bool compress; // Compress wal files
int compressLevel; // Compression level for wal files
ArchivePushCheckResult archiveInfo; // Archive info
} ArchivePushAsyncData;
static ProtocolParallelJob *archivePushAsyncCallback(void *data, unsigned int clientIdx)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM_P(VOID, data);
FUNCTION_TEST_PARAM(UINT, clientIdx);
FUNCTION_TEST_END();
// No special logic based on the client, we'll just get the next job
(void)clientIdx;
// Get a new job if there are any left
ArchivePushAsyncData *jobData = data;
if (jobData->walFileIdx < strLstSize(jobData->walFileList))
{
const String *walFile = strLstGet(jobData->walFileList, jobData->walFileIdx);
jobData->walFileIdx++;
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_PUSH_STR);
protocolCommandParamAdd(command, VARSTR(strNewFmt("%s/%s", strPtr(jobData->walPath), strPtr(walFile))));
protocolCommandParamAdd(command, VARSTR(jobData->archiveInfo.archiveId));
protocolCommandParamAdd(command, VARUINT(jobData->archiveInfo.pgVersion));
protocolCommandParamAdd(command, VARUINT64(jobData->archiveInfo.pgSystemId));
protocolCommandParamAdd(command, VARSTR(walFile));
protocolCommandParamAdd(command, VARUINT(jobData->cipherType));
protocolCommandParamAdd(command, VARSTR(jobData->archiveInfo.archiveCipherPass));
protocolCommandParamAdd(command, VARBOOL(jobData->compress));
protocolCommandParamAdd(command, VARINT(jobData->compressLevel));
FUNCTION_TEST_RETURN(protocolParallelJobNew(VARSTR(walFile), command));
}
FUNCTION_TEST_RETURN(NULL);
}
void
cmdArchivePushAsync(void)
{
@@ -398,7 +444,12 @@ cmdArchivePushAsync(void)
if (strLstSize(commandParam) != 1)
THROW(ParamRequiredError, "WAL path to push required");
const String *walPath = strLstGet(commandParam, 0);
ArchivePushAsyncData jobData =
{
.walPath = strLstGet(commandParam, 0),
.compress = cfgOptionBool(cfgOptCompress),
.compressLevel = cfgOptionInt(cfgOptCompressLevel),
};
TRY_BEGIN()
{
@@ -406,23 +457,23 @@ cmdArchivePushAsync(void)
lockStopTest();
// Get a list of WAL files that are ready for processing
StringList *walFileList = archivePushProcessList(walPath);
jobData.walFileList = archivePushProcessList(jobData.walPath);
// The archive-push-async command should not have been called unless there are WAL files to process
if (strLstSize(walFileList) == 0)
if (strLstSize(jobData.walFileList) == 0)
THROW(AssertError, "no WAL files to process");
LOG_INFO(
"push %u WAL file(s) to archive: %s%s", strLstSize(walFileList), strPtr(strLstGet(walFileList, 0)),
strLstSize(walFileList) == 1 ?
"" : strPtr(strNewFmt("...%s", strPtr(strLstGet(walFileList, strLstSize(walFileList) - 1)))));
"push %u WAL file(s) to archive: %s%s", strLstSize(jobData.walFileList), strPtr(strLstGet(jobData.walFileList, 0)),
strLstSize(jobData.walFileList) == 1 ?
"" : strPtr(strNewFmt("...%s", strPtr(strLstGet(jobData.walFileList, strLstSize(jobData.walFileList) - 1)))));
// Drop files if queue max has been exceeded
if (cfgOptionTest(cfgOptArchivePushQueueMax) && archivePushDrop(walPath, walFileList))
if (cfgOptionTest(cfgOptArchivePushQueueMax) && archivePushDrop(jobData.walPath, jobData.walFileList))
{
for (unsigned int walFileIdx = 0; walFileIdx < strLstSize(walFileList); walFileIdx++)
for (unsigned int walFileIdx = 0; walFileIdx < strLstSize(jobData.walFileList); walFileIdx++)
{
const String *walFile = strLstGet(walFileList, walFileIdx);
const String *walFile = strLstGet(jobData.walFileList, walFileIdx);
const String *warning = archivePushDropWarning(walFile, cfgOptionUInt64(cfgOptArchivePushQueueMax));
archiveAsyncStatusOkWrite(archiveModePush, walFile, warning);
@@ -435,38 +486,20 @@ cmdArchivePushAsync(void)
// Get the repo storage in case it is remote and encryption settings need to be pulled down
storageRepo();
// Get cipher type
jobData.cipherType = cipherType(cfgOptionStr(cfgOptRepoCipherType));
// Get archive info
ArchivePushCheckResult archiveInfo = archivePushCheck(
jobData.archiveInfo = archivePushCheck(
cipherType(cfgOptionStr(cfgOptRepoCipherType)), cfgOptionStr(cfgOptRepoCipherPass));
// Create the parallel executor
ProtocolParallel *parallelExec = protocolParallelNew(
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2);
(TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2, archivePushAsyncCallback, &jobData);
for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, processIdx));
// Queue jobs in executor
for (unsigned int walFileIdx = 0; walFileIdx < strLstSize(walFileList); walFileIdx++)
{
protocolKeepAlive();
const String *walFile = strLstGet(walFileList, walFileIdx);
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_PUSH_STR);
protocolCommandParamAdd(command, VARSTR(strNewFmt("%s/%s", strPtr(walPath), strPtr(walFile))));
protocolCommandParamAdd(command, VARSTR(archiveInfo.archiveId));
protocolCommandParamAdd(command, VARUINT(archiveInfo.pgVersion));
protocolCommandParamAdd(command, VARUINT64(archiveInfo.pgSystemId));
protocolCommandParamAdd(command, VARSTR(walFile));
protocolCommandParamAdd(command, VARUINT(cipherType(cfgOptionStr(cfgOptRepoCipherType))));
protocolCommandParamAdd(command, VARSTR(archiveInfo.archiveCipherPass));
protocolCommandParamAdd(command, VARBOOL(cfgOptionBool(cfgOptCompress)));
protocolCommandParamAdd(command, VARINT(cfgOptionInt(cfgOptCompressLevel)));
protocolParallelJobAdd(parallelExec, protocolParallelJobNew(VARSTR(walFile), command));
}
// Process jobs
do
{
@@ -498,6 +531,8 @@ cmdArchivePushAsync(void)
archiveAsyncStatusErrorWrite(
archiveModePush, walFile, protocolParallelJobErrorCode(job), protocolParallelJobErrorMessage(job));
}
protocolParallelJobFree(job);
}
}
while (!protocolParallelDone(parallelExec));

View File

@@ -23,6 +23,8 @@ struct ProtocolParallel
{
MemContext *memContext;
TimeMSec timeout; // Max time to wait for jobs before returning
ParallelJobCallback *callbackFunction; // Function to get new jobs
void *callbackData; // Data to pass to callback function
List *clientList; // List of clients to process jobs
List *jobList; // List of jobs to be processed
@@ -38,12 +40,17 @@ OBJECT_DEFINE_FREE(PROTOCOL_PARALLEL);
Create object
***********************************************************************************************************************************/
ProtocolParallel *
protocolParallelNew(TimeMSec timeout)
protocolParallelNew(TimeMSec timeout, ParallelJobCallback *callbackFunction, void *callbackData)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(UINT64, timeout);
FUNCTION_LOG_PARAM(FUNCTIONP, callbackFunction);
FUNCTION_LOG_PARAM_P(VOID, callbackData);
FUNCTION_LOG_END();
ASSERT(callbackFunction != NULL);
ASSERT(callbackData != NULL);
ProtocolParallel *this = NULL;
MEM_CONTEXT_NEW_BEGIN("ProtocolParallel")
@@ -52,6 +59,9 @@ protocolParallelNew(TimeMSec timeout)
this->memContext = memContextCurrent();
this->timeout = timeout;
this->callbackFunction = callbackFunction;
this->callbackData = callbackData;
this->clientList = lstNew(sizeof(ProtocolClient *));
this->jobList = lstNew(sizeof(ProtocolParallelJob *));
this->state = protocolParallelJobStatePending;
@@ -84,27 +94,6 @@ protocolParallelClientAdd(ProtocolParallel *this, ProtocolClient *client)
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Add job
***********************************************************************************************************************************/
void
protocolParallelJobAdd(ProtocolParallel *this, ProtocolParallelJob *job)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL, this);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL_JOB, job);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(job != NULL);
ASSERT(this->state == protocolParallelJobStatePending);
protocolParallelJobMove(job, lstMemContext(this->jobList));
lstAdd(this->jobList, &job);
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Process jobs
***********************************************************************************************************************************/
@@ -208,21 +197,29 @@ protocolParallelProcess(ProtocolParallel *this)
// If nothing is running for this client
if (this->clientJobList[clientIdx] == NULL)
{
for (unsigned int jobIdx = 0; jobIdx < lstSize(this->jobList); jobIdx++)
// Get a new job
ProtocolParallelJob *job = NULL;
MEM_CONTEXT_BEGIN(lstMemContext(this->jobList))
{
ProtocolParallelJob *job = *(ProtocolParallelJob **)lstGet(this->jobList, jobIdx);
job = this->callbackFunction(this->callbackData, clientIdx);
}
MEM_CONTEXT_END();
if (protocolParallelJobState(job) == protocolParallelJobStatePending)
{
protocolClientWriteCommand(
*(ProtocolClient **)lstGet(this->clientList, clientIdx), protocolParallelJobCommand(job));
// If a new job was found
if (job != NULL)
{
// Add to the job list
lstAdd(this->jobList, &job);
protocolParallelJobProcessIdSet(job, clientIdx + 1);
protocolParallelJobStateSet(job, protocolParallelJobStateRunning);
this->clientJobList[clientIdx] = job;
// Send the job to the client
protocolClientWriteCommand(
*(ProtocolClient **)lstGet(this->clientList, clientIdx), protocolParallelJobCommand(job));
break;
}
// Set client id and running state
protocolParallelJobProcessIdSet(job, clientIdx + 1);
protocolParallelJobStateSet(job, protocolParallelJobStateRunning);
this->clientJobList[clientIdx] = job;
}
}
}

View File

@@ -16,16 +16,23 @@ typedef struct ProtocolParallel ProtocolParallel;
#include "protocol/client.h"
#include "protocol/parallelJob.h"
/***********************************************************************************************************************************
Job request callback
Called whenever a new job is required for processing. If no more jobs are available then NULL is returned. Note that NULL must be
returned to each clientIdx in case job distribution varies by clientIdx.
***********************************************************************************************************************************/
typedef ProtocolParallelJob *ParallelJobCallback(void *data, unsigned int clientIdx);
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
ProtocolParallel *protocolParallelNew(TimeMSec timeout);
ProtocolParallel *protocolParallelNew(TimeMSec timeout, ParallelJobCallback *callbackFunction, void *callbackData);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
void protocolParallelClientAdd(ProtocolParallel *this, ProtocolClient *client);
void protocolParallelJobAdd(ProtocolParallel *this, ProtocolParallelJob *job);
unsigned int protocolParallelProcess(ProtocolParallel *this);
/***********************************************************************************************************************************

View File

@@ -54,6 +54,40 @@ testServerProtocol(const String *command, const VariantList *paramList, Protocol
FUNCTION_HARNESS_RESULT(BOOL, found);
}
/***********************************************************************************************************************************
Test ParallelJobCallback
***********************************************************************************************************************************/
typedef struct TestParallelJobCallback
{
List *jobList; // List of jobs to process
unsigned int jobIdx; // Current index in the list to be processed
bool clientSeen[2]; // Make sure the client idx was seen
} TestParallelJobCallback;
static ProtocolParallelJob *testParallelJobCallback(void *data, unsigned int clientIdx)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM_P(VOID, data);
FUNCTION_TEST_PARAM(UINT, clientIdx);
FUNCTION_TEST_END();
TestParallelJobCallback *listData = data;
// Mark the client idx as seen
listData->clientSeen[clientIdx] = true;
// Get a new job if there are any left
if (listData->jobIdx < lstSize(listData->jobList))
{
ProtocolParallelJob *job = *(ProtocolParallelJob **)lstGet(listData->jobList, listData->jobIdx);
listData->jobIdx++;
FUNCTION_TEST_RETURN(protocolParallelJobMove(job, memContextCurrent()));
}
FUNCTION_TEST_RETURN(NULL);
}
/***********************************************************************************************************************************
Test Run
***********************************************************************************************************************************/
@@ -633,8 +667,9 @@ testRun(void)
HARNESS_FORK_PARENT_BEGIN()
{
// -----------------------------------------------------------------------------------------------------------------
TestParallelJobCallback data = {.jobList = lstNew(sizeof(ProtocolParallelJob *))};
ProtocolParallel *parallel = NULL;
TEST_ASSIGN(parallel, protocolParallelNew(2000), "create parallel");
TEST_ASSIGN(parallel, protocolParallelNew(2000, testParallelJobCallback, &data), "create parallel");
TEST_RESULT_STR(
strPtr(protocolParallelToLog(parallel)), "{state: pending, clientTotal: 0, jobTotal: 0}", "check log");
@@ -676,18 +711,18 @@ testRun(void)
ProtocolCommand *command = protocolCommandNew(strNew("command1"));
protocolCommandParamAdd(command, varNewStr(strNew("param1")));
protocolCommandParamAdd(command, varNewStr(strNew("param2")));
TEST_RESULT_VOID(
protocolParallelJobAdd(parallel, protocolParallelJobNew(varNewStr(strNew("job1")), command)), "add job");
ProtocolParallelJob *job = protocolParallelJobNew(varNewStr(strNew("job1")), command);
TEST_RESULT_VOID(lstAdd(data.jobList, &job), "add job");
command = protocolCommandNew(strNew("command2"));
protocolCommandParamAdd(command, varNewStr(strNew("param1")));
TEST_RESULT_VOID(
protocolParallelJobAdd(parallel, protocolParallelJobNew(varNewStr(strNew("job2")), command)), "add job");
job = protocolParallelJobNew(varNewStr(strNew("job2")), command);
TEST_RESULT_VOID(lstAdd(data.jobList, &job), "add job");
command = protocolCommandNew(strNew("command3"));
protocolCommandParamAdd(command, varNewStr(strNew("param1")));
TEST_RESULT_VOID(
protocolParallelJobAdd(parallel, protocolParallelJobNew(varNewStr(strNew("job3")), command)), "add job");
job = protocolParallelJobNew(varNewStr(strNew("job3")), command);
TEST_RESULT_VOID(lstAdd(data.jobList, &job), "add job");
// Process jobs
TEST_RESULT_INT(protocolParallelProcess(parallel), 0, "process jobs");