1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-04-13 11:30:40 +02:00
pgbackrest/src/protocol/parallel.c
David Steele 1bd5530a59 Remove double spaces from comments and documentation.
Double spaces have fallen out of favor in recent years because they no longer contribute to readability.

We have been using single spaces and editing related paragraphs for some time, but now it seems best to update the remaining instances to avoid churn in unrelated commits and to make it clearer what spacing contributors should use.
2023-05-02 12:57:12 +03:00

281 lines
11 KiB
C

/***********************************************************************************************************************************
Protocol Parallel Executor
***********************************************************************************************************************************/
#include "build.auto.h"
#include <string.h>
#include <sys/select.h>
#include "common/debug.h"
#include "common/log.h"
#include "common/macro.h"
#include "common/type/keyValue.h"
#include "common/type/list.h"
#include "protocol/command.h"
#include "protocol/helper.h"
#include "protocol/parallel.h"
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct ProtocolParallel
{
TimeMSec timeout; // Max time to wait for jobs before returning
ParallelJobCallback *callbackFunction; // Function to get new jobs
void *callbackData; // Data to pass to callback function
List *clientList; // List of clients to process jobs
List *jobList; // List of jobs to be processed
ProtocolParallelJob **clientJobList; // Jobs being processing by each client
ProtocolParallelJobState state; // Overall state of job processing
};
/**********************************************************************************************************************************/
FN_EXTERN ProtocolParallel *
protocolParallelNew(TimeMSec timeout, ParallelJobCallback *callbackFunction, void *callbackData)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(UINT64, timeout);
FUNCTION_LOG_PARAM(FUNCTIONP, callbackFunction);
FUNCTION_LOG_PARAM_P(VOID, callbackData);
FUNCTION_LOG_END();
ASSERT(callbackFunction != NULL);
ASSERT(callbackData != NULL);
OBJ_NEW_BEGIN(ProtocolParallel, .childQty = MEM_CONTEXT_QTY_MAX, .allocQty = MEM_CONTEXT_QTY_MAX)
{
*this = (ProtocolParallel)
{
.timeout = timeout,
.callbackFunction = callbackFunction,
.callbackData = callbackData,
.clientList = lstNewP(sizeof(ProtocolClient *)),
.jobList = lstNewP(sizeof(ProtocolParallelJob *)),
.state = protocolParallelJobStatePending,
};
}
OBJ_NEW_END();
FUNCTION_LOG_RETURN(PROTOCOL_PARALLEL, this);
}
/**********************************************************************************************************************************/
FN_EXTERN void
protocolParallelClientAdd(ProtocolParallel *this, ProtocolClient *client)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL, this);
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(client != NULL);
ASSERT(this->state == protocolParallelJobStatePending);
if (protocolClientIoReadFd(client) == -1)
THROW(AssertError, "client with read fd is required");
lstAdd(this->clientList, &client);
FUNCTION_LOG_RETURN_VOID();
}
/**********************************************************************************************************************************/
FN_EXTERN unsigned int
protocolParallelProcess(ProtocolParallel *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->state != protocolParallelJobStateDone);
unsigned int result = 0;
MEM_CONTEXT_TEMP_BEGIN()
{
// If called for the first time, initialize processing
if (this->state == protocolParallelJobStatePending)
{
MEM_CONTEXT_OBJ_BEGIN(this)
{
this->clientJobList = memNewPtrArray(lstSize(this->clientList));
}
MEM_CONTEXT_OBJ_END();
this->state = protocolParallelJobStateRunning;
}
// Initialize the file descriptor set used for select
fd_set selectSet;
FD_ZERO(&selectSet);
int fdMax = -1;
// Find clients that are running jobs
unsigned int clientRunningTotal = 0;
for (unsigned int clientIdx = 0; clientIdx < lstSize(this->clientList); clientIdx++)
{
if (this->clientJobList[clientIdx] != NULL)
{
int fd = protocolClientIoReadFd(*(ProtocolClient **)lstGet(this->clientList, clientIdx));
FD_SET(fd, &selectSet);
// Find the max file descriptor needed for select()
MAX_ASSIGN(fdMax, fd);
clientRunningTotal++;
}
}
// If clients are running then wait for one to finish
if (clientRunningTotal > 0)
{
// Initialize timeout struct used for select. Recreate this structure each time since Linux (at least) will modify it.
struct timeval timeoutSelect;
timeoutSelect.tv_sec = (time_t)(this->timeout / MSEC_PER_SEC);
timeoutSelect.tv_usec = (suseconds_t)(this->timeout % MSEC_PER_SEC * 1000);
// Determine if there is data to be read
int completed = select(fdMax + 1, &selectSet, NULL, NULL, &timeoutSelect);
THROW_ON_SYS_ERROR(completed == -1, AssertError, "unable to select from parallel client(s)");
// If any jobs have completed then get the results
if (completed > 0)
{
for (unsigned int clientIdx = 0; clientIdx < lstSize(this->clientList); clientIdx++)
{
ProtocolParallelJob *job = this->clientJobList[clientIdx];
if (job != NULL &&
FD_ISSET(
protocolClientIoReadFd(*(ProtocolClient **)lstGet(this->clientList, clientIdx)),
&selectSet))
{
MEM_CONTEXT_TEMP_BEGIN()
{
TRY_BEGIN()
{
ProtocolClient *const client = *(ProtocolClient **)lstGet(this->clientList, clientIdx);
protocolParallelJobResultSet(job, protocolClientDataGet(client));
protocolClientDataEndGet(client);
}
CATCH_ANY()
{
protocolParallelJobErrorSet(job, errorCode(), STR(errorMessage()));
}
TRY_END();
protocolParallelJobStateSet(job, protocolParallelJobStateDone);
this->clientJobList[clientIdx] = NULL;
}
MEM_CONTEXT_TEMP_END();
}
}
result = (unsigned int)completed;
}
}
// Find new jobs to be run
for (unsigned int clientIdx = 0; clientIdx < lstSize(this->clientList); clientIdx++)
{
// If nothing is running for this client
if (this->clientJobList[clientIdx] == NULL)
{
// Get a new job
ProtocolParallelJob *job = NULL;
MEM_CONTEXT_BEGIN(lstMemContext(this->jobList))
{
job = this->callbackFunction(this->callbackData, clientIdx);
}
MEM_CONTEXT_END();
// If a new job was found
if (job != NULL)
{
// Add to the job list
lstAdd(this->jobList, &job);
// Put command
protocolClientCommandPut(
*(ProtocolClient **)lstGet(this->clientList, clientIdx), protocolParallelJobCommand(job), false);
// Set client id and running state
protocolParallelJobProcessIdSet(job, clientIdx + 1);
protocolParallelJobStateSet(job, protocolParallelJobStateRunning);
this->clientJobList[clientIdx] = job;
}
// Else no more jobs for this client so free it
else
protocolLocalFree(clientIdx + 1);
}
}
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(UINT, result);
}
/**********************************************************************************************************************************/
FN_EXTERN ProtocolParallelJob *
protocolParallelResult(ProtocolParallel *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->state == protocolParallelJobStateRunning);
ProtocolParallelJob *result = NULL;
// Find the next completed job
for (unsigned int jobIdx = 0; jobIdx < lstSize(this->jobList); jobIdx++)
{
ProtocolParallelJob *job = *(ProtocolParallelJob **)lstGet(this->jobList, jobIdx);
if (protocolParallelJobState(job) == protocolParallelJobStateDone)
{
result = protocolParallelJobMove(job, memContextCurrent());
lstRemoveIdx(this->jobList, jobIdx);
break;
}
}
FUNCTION_LOG_RETURN(PROTOCOL_PARALLEL_JOB, result);
}
/**********************************************************************************************************************************/
FN_EXTERN bool
protocolParallelDone(ProtocolParallel *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(PROTOCOL_PARALLEL, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->state != protocolParallelJobStatePending);
// If there are no jobs left then we are done
if (this->state != protocolParallelJobStateDone && lstEmpty(this->jobList))
this->state = protocolParallelJobStateDone;
FUNCTION_LOG_RETURN(BOOL, this->state == protocolParallelJobStateDone);
}
/**********************************************************************************************************************************/
FN_EXTERN void
protocolParallelToLog(const ProtocolParallel *const this, StringStatic *const debugLog)
{
strStcCat(debugLog, "{state: ");
strStcResultSizeInc(debugLog, strIdToLog(this->state, strStcRemains(debugLog), strStcRemainsSize(debugLog)));
strStcFmt(debugLog, ", clientTotal: %u, jobTotal: %u}", lstSize(this->clientList), lstSize(this->jobList));
}