mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-01-18 04:58:51 +02:00
Refactor protocol modules with inline getters/setters.
Extend the pattern introduced in 79a2d02c to the protocol modules.
This commit is contained in:
parent
235e32b57d
commit
4937653a3d
@ -32,11 +32,9 @@ Object type
|
||||
***********************************************************************************************************************************/
|
||||
struct ProtocolClient
|
||||
{
|
||||
MemContext *memContext;
|
||||
ProtocolClientPub pub; // Publicly accessible variables
|
||||
const String *name;
|
||||
const String *errorPrefix;
|
||||
IoRead *read;
|
||||
IoWrite *write;
|
||||
TimeMSec keepAliveTime;
|
||||
};
|
||||
|
||||
@ -87,18 +85,21 @@ protocolClientNew(const String *name, const String *service, IoRead *read, IoWri
|
||||
|
||||
*this = (ProtocolClient)
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.pub =
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.read = read,
|
||||
.write = write,
|
||||
},
|
||||
.name = strDup(name),
|
||||
.errorPrefix = strNewFmt("raised from %s", strZ(name)),
|
||||
.read = read,
|
||||
.write = write,
|
||||
.keepAliveTime = timeMSec(),
|
||||
};
|
||||
|
||||
// Read, parse, and check the protocol greeting
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
String *greeting = ioReadLine(this->read);
|
||||
String *greeting = ioReadLine(protocolClientIoRead(this));
|
||||
KeyValue *greetingKv = jsonToKv(greeting);
|
||||
|
||||
const String *expected[] =
|
||||
@ -137,7 +138,7 @@ protocolClientNew(const String *name, const String *service, IoRead *read, IoWri
|
||||
protocolClientNoOp(this);
|
||||
|
||||
// Set a callback to shutdown the protocol
|
||||
memContextCallbackSet(this->memContext, protocolClientFreeResource, this);
|
||||
memContextCallbackSet(this->pub.memContext, protocolClientFreeResource, this);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
@ -203,7 +204,7 @@ protocolClientReadOutput(ProtocolClient *this, bool outputRequired)
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Read the response
|
||||
String *response = ioReadLine(this->read);
|
||||
String *response = ioReadLine(protocolClientIoRead(this));
|
||||
KeyValue *responseKv = varKv(jsonToVar(response));
|
||||
|
||||
// Process error if any
|
||||
@ -242,8 +243,8 @@ protocolClientWriteCommand(ProtocolClient *this, const ProtocolCommand *command)
|
||||
ASSERT(command != NULL);
|
||||
|
||||
// Write out the command
|
||||
ioWriteStrLine(this->write, protocolCommandJson(command));
|
||||
ioWriteFlush(this->write);
|
||||
ioWriteStrLine(protocolClientIoWrite(this), protocolCommandJson(command));
|
||||
ioWriteFlush(protocolClientIoWrite(this));
|
||||
|
||||
// Reset the keep alive time
|
||||
this->keepAliveTime = timeMSec();
|
||||
@ -302,7 +303,7 @@ protocolClientReadLine(ProtocolClient *this)
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
result = ioReadLine(this->read);
|
||||
result = ioReadLine(protocolClientIoRead(this));
|
||||
|
||||
if (strSize(result) == 0)
|
||||
{
|
||||
@ -332,32 +333,6 @@ protocolClientReadLine(ProtocolClient *this)
|
||||
FUNCTION_LOG_RETURN(STRING, result);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
IoRead *
|
||||
protocolClientIoRead(const ProtocolClient *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_CLIENT, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->read);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
IoWrite *
|
||||
protocolClientIoWrite(const ProtocolClient *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_CLIENT, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->write);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
String *
|
||||
protocolClientToLog(const ProtocolClient *this)
|
||||
|
@ -42,6 +42,32 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
ProtocolClient *protocolClientNew(const String *name, const String *service, IoRead *read, IoWrite *write);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct ProtocolClientPub
|
||||
{
|
||||
MemContext *memContext; // Mem context
|
||||
IoRead *read; // Read interface
|
||||
IoWrite *write; // Write interface
|
||||
} ProtocolClientPub;
|
||||
|
||||
// Read interface
|
||||
__attribute__((always_inline)) static inline IoRead *
|
||||
protocolClientIoRead(ProtocolClient *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolClientPub *)this)->read;
|
||||
}
|
||||
|
||||
// Write interface
|
||||
__attribute__((always_inline)) static inline IoWrite *
|
||||
protocolClientIoWrite(ProtocolClient *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolClientPub *)this)->write;
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
@ -67,15 +93,6 @@ const Variant *protocolClientReadOutput(ProtocolClient *this, bool outputRequire
|
||||
// Write the protocol command
|
||||
void protocolClientWriteCommand(ProtocolClient *this, const ProtocolCommand *command);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Read interface
|
||||
IoRead *protocolClientIoRead(const ProtocolClient *this);
|
||||
|
||||
// Write interface
|
||||
IoWrite *protocolClientIoWrite(const ProtocolClient *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -25,6 +25,12 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
ProtocolCommand *protocolCommandNew(const String *command);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Command JSON
|
||||
String *protocolCommandJson(const ProtocolCommand *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
@ -39,12 +45,6 @@ protocolCommandMove(ProtocolCommand *this, MemContext *parentNew)
|
||||
// Read the command output
|
||||
ProtocolCommand *protocolCommandParamAdd(ProtocolCommand *this, const Variant *param);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Command JSON
|
||||
String *protocolCommandJson(const ProtocolCommand *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -26,24 +26,6 @@ Constants
|
||||
#define PROTOCOL_REMOTE_TYPE_PG "pg"
|
||||
#define PROTOCOL_REMOTE_TYPE_REPO "repo"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Send keepalives to all remotes
|
||||
void protocolKeepAlive(void);
|
||||
|
||||
// Local protocol client
|
||||
ProtocolClient *protocolLocalGet(ProtocolStorageType protocolStorageType, unsigned int hostId, unsigned int protocolId);
|
||||
|
||||
// Free (shutdown) a local
|
||||
void protocolLocalFree(unsigned int protocolId);
|
||||
|
||||
// Remote protocol client
|
||||
ProtocolClient *protocolRemoteGet(ProtocolStorageType protocolStorageType, unsigned int hostId);
|
||||
|
||||
// Free (shutdown) a remote
|
||||
void protocolRemoteFree(unsigned int hostId);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
@ -64,6 +46,24 @@ void repoIsLocalVerifyIdx(unsigned int repoIdx);
|
||||
ProtocolStorageType protocolStorageTypeEnum(const String *type);
|
||||
const String *protocolStorageTypeStr(ProtocolStorageType type);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Send keepalives to all remotes
|
||||
void protocolKeepAlive(void);
|
||||
|
||||
// Local protocol client
|
||||
ProtocolClient *protocolLocalGet(ProtocolStorageType protocolStorageType, unsigned int hostId, unsigned int protocolId);
|
||||
|
||||
// Free (shutdown) a local
|
||||
void protocolLocalFree(unsigned int protocolId);
|
||||
|
||||
// Remote protocol client
|
||||
ProtocolClient *protocolRemoteGet(ProtocolStorageType protocolStorageType, unsigned int hostId);
|
||||
|
||||
// Free (shutdown) a remote
|
||||
void protocolRemoteFree(unsigned int hostId);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -27,15 +27,6 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
ProtocolParallel *protocolParallelNew(TimeMSec timeout, ParallelJobCallback *callbackFunction, void *callbackData);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Add client
|
||||
void protocolParallelClientAdd(ProtocolParallel *this, ProtocolClient *client);
|
||||
|
||||
// Process jobs
|
||||
unsigned int protocolParallelProcess(ProtocolParallel *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
@ -45,6 +36,15 @@ bool protocolParallelDone(ProtocolParallel *this);
|
||||
// Completed job result
|
||||
ProtocolParallelJob *protocolParallelResult(ProtocolParallel *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Add client
|
||||
void protocolParallelClientAdd(ProtocolParallel *this, ProtocolClient *client);
|
||||
|
||||
// Process jobs
|
||||
unsigned int protocolParallelProcess(ProtocolParallel *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -14,16 +14,7 @@ Object type
|
||||
***********************************************************************************************************************************/
|
||||
struct ProtocolParallelJob
|
||||
{
|
||||
MemContext *memContext; // Job mem context
|
||||
ProtocolParallelJobState state; // Current state of the job
|
||||
|
||||
const Variant *key; // Unique key used to identify the job
|
||||
const ProtocolCommand *command; // Command to be executed
|
||||
|
||||
unsigned int processId; // Process that executed this job
|
||||
int code; // Non-zero result indicates an error
|
||||
String *message; // Message if there was a error
|
||||
const Variant *result; // Result if job was successful
|
||||
ProtocolParallelJobPub pub; // Publicly accessible variables
|
||||
};
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
@ -43,12 +34,15 @@ protocolParallelJobNew(const Variant *key, ProtocolCommand *command)
|
||||
|
||||
*this = (ProtocolParallelJob)
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.state = protocolParallelJobStatePending,
|
||||
.key = varDup(key),
|
||||
.pub =
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.state = protocolParallelJobStatePending,
|
||||
.key = varDup(key),
|
||||
},
|
||||
};
|
||||
|
||||
this->command = protocolCommandMove(command, this->memContext);
|
||||
this->pub.command = protocolCommandMove(command, this->pub.memContext);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
@ -56,43 +50,6 @@ protocolParallelJobNew(const Variant *key, ProtocolCommand *command)
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
const ProtocolCommand *
|
||||
protocolParallelJobCommand(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->command);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
int
|
||||
protocolParallelJobErrorCode(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->code);
|
||||
}
|
||||
|
||||
const String *
|
||||
protocolParallelJobErrorMessage(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->message);
|
||||
}
|
||||
|
||||
void
|
||||
protocolParallelJobErrorSet(ProtocolParallelJob *this, int code, const String *message)
|
||||
{
|
||||
@ -106,10 +63,10 @@ protocolParallelJobErrorSet(ProtocolParallelJob *this, int code, const String *m
|
||||
ASSERT(code != 0);
|
||||
ASSERT(message != NULL);
|
||||
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
MEM_CONTEXT_BEGIN(this->pub.memContext)
|
||||
{
|
||||
this->code = code;
|
||||
this->message = strDup(message);
|
||||
this->pub.code = code;
|
||||
this->pub.message = strDup(message);
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
|
||||
@ -117,31 +74,6 @@ protocolParallelJobErrorSet(ProtocolParallelJob *this, int code, const String *m
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
const Variant *
|
||||
protocolParallelJobKey(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->key);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
unsigned int
|
||||
protocolParallelJobProcessId(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->processId);
|
||||
}
|
||||
|
||||
void
|
||||
protocolParallelJobProcessIdSet(ProtocolParallelJob *this, unsigned int processId)
|
||||
{
|
||||
@ -153,24 +85,12 @@ protocolParallelJobProcessIdSet(ProtocolParallelJob *this, unsigned int processI
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(processId > 0);
|
||||
|
||||
this ->processId = processId;
|
||||
this->pub.processId = processId;
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
const Variant *
|
||||
protocolParallelJobResult(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->result);
|
||||
}
|
||||
|
||||
void
|
||||
protocolParallelJobResultSet(ProtocolParallelJob *this, const Variant *result)
|
||||
{
|
||||
@ -180,11 +100,11 @@ protocolParallelJobResultSet(ProtocolParallelJob *this, const Variant *result)
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->code == 0);
|
||||
ASSERT(protocolParallelJobErrorCode(this) == 0);
|
||||
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
MEM_CONTEXT_BEGIN(this->pub.memContext)
|
||||
{
|
||||
this->result = varDup(result);
|
||||
this->pub.result = varDup(result);
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
|
||||
@ -192,18 +112,6 @@ protocolParallelJobResultSet(ProtocolParallelJob *this, const Variant *result)
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
ProtocolParallelJobState
|
||||
protocolParallelJobState(const ProtocolParallelJob *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_PARALLEL_JOB, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->state);
|
||||
}
|
||||
|
||||
void
|
||||
protocolParallelJobStateSet(ProtocolParallelJob *this, ProtocolParallelJobState state)
|
||||
{
|
||||
@ -214,14 +122,14 @@ protocolParallelJobStateSet(ProtocolParallelJob *this, ProtocolParallelJobState
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
if (this->state == protocolParallelJobStatePending && state == protocolParallelJobStateRunning)
|
||||
this->state = protocolParallelJobStateRunning;
|
||||
else if (this->state == protocolParallelJobStateRunning && state == protocolParallelJobStateDone)
|
||||
this->state = protocolParallelJobStateDone;
|
||||
if (this->pub.state == protocolParallelJobStatePending && state == protocolParallelJobStateRunning)
|
||||
this->pub.state = protocolParallelJobStateRunning;
|
||||
else if (this->pub.state == protocolParallelJobStateRunning && state == protocolParallelJobStateDone)
|
||||
this->pub.state = protocolParallelJobStateDone;
|
||||
else
|
||||
{
|
||||
THROW_FMT(
|
||||
AssertError, "invalid state transition from '%s' to '%s'", protocolParallelJobToConstZ(this->state),
|
||||
AssertError, "invalid state transition from '%s' to '%s'", protocolParallelJobToConstZ(protocolParallelJobState(this)),
|
||||
protocolParallelJobToConstZ(state));
|
||||
}
|
||||
|
||||
@ -256,7 +164,8 @@ String *
|
||||
protocolParallelJobToLog(const ProtocolParallelJob *this)
|
||||
{
|
||||
return strNewFmt(
|
||||
"{state: %s, key: %s, command: %s, code: %d, message: %s, result: %s}", protocolParallelJobToConstZ(this->state),
|
||||
strZ(varToLog(this->key)), strZ(protocolCommandToLog(this->command)), this->code, strZ(strToLog(this->message)),
|
||||
strZ(varToLog(this->result)));
|
||||
"{state: %s, key: %s, command: %s, code: %d, message: %s, result: %s}",
|
||||
protocolParallelJobToConstZ(protocolParallelJobState(this)), strZ(varToLog(protocolParallelJobKey(this))),
|
||||
strZ(protocolCommandToLog(protocolParallelJobCommand(this))), protocolParallelJobErrorCode(this),
|
||||
strZ(strToLog(protocolParallelJobErrorMessage(this))), strZ(varToLog(protocolParallelJobResult(this))));
|
||||
}
|
||||
|
@ -28,6 +28,84 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
ProtocolParallelJob *protocolParallelJobNew(const Variant *key, ProtocolCommand *command);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct ProtocolParallelJobPub
|
||||
{
|
||||
MemContext *memContext; // Mem context
|
||||
const Variant *key; // Unique key used to identify the job
|
||||
const ProtocolCommand *command; // Command to be executed
|
||||
unsigned int processId; // Process that executed this job
|
||||
ProtocolParallelJobState state; // Current state of the job
|
||||
int code; // Non-zero result indicates an error
|
||||
String *message; // Message if there was a error
|
||||
const Variant *result; // Result if job was successful
|
||||
} ProtocolParallelJobPub;
|
||||
|
||||
// Job command
|
||||
__attribute__((always_inline)) static inline const ProtocolCommand *
|
||||
protocolParallelJobCommand(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->command;
|
||||
}
|
||||
|
||||
// Job error
|
||||
__attribute__((always_inline)) static inline int
|
||||
protocolParallelJobErrorCode(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->code;
|
||||
}
|
||||
|
||||
__attribute__((always_inline)) static inline const String *
|
||||
protocolParallelJobErrorMessage(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->message;
|
||||
}
|
||||
|
||||
void protocolParallelJobErrorSet(ProtocolParallelJob *this, int code, const String *message);
|
||||
|
||||
// Job key
|
||||
__attribute__((always_inline)) static inline const Variant *
|
||||
protocolParallelJobKey(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->key;
|
||||
}
|
||||
|
||||
// Process Id
|
||||
__attribute__((always_inline)) static inline unsigned int
|
||||
protocolParallelJobProcessId(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->processId;
|
||||
}
|
||||
|
||||
void protocolParallelJobProcessIdSet(ProtocolParallelJob *this, unsigned int processId);
|
||||
|
||||
// Job result
|
||||
__attribute__((always_inline)) static inline const Variant *
|
||||
protocolParallelJobResult(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->result;
|
||||
}
|
||||
|
||||
void protocolParallelJobResultSet(ProtocolParallelJob *this, const Variant *result);
|
||||
|
||||
// Job state
|
||||
__attribute__((always_inline)) static inline ProtocolParallelJobState
|
||||
protocolParallelJobState(const ProtocolParallelJob *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolParallelJobPub *)this)->state;
|
||||
}
|
||||
|
||||
void protocolParallelJobStateSet(ProtocolParallelJob *this, ProtocolParallelJobState state);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
@ -38,32 +116,6 @@ protocolParallelJobMove(ProtocolParallelJob *this, MemContext *parentNew)
|
||||
return objMove(this, parentNew);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Job command
|
||||
const ProtocolCommand *protocolParallelJobCommand(const ProtocolParallelJob *this);
|
||||
|
||||
// Job error
|
||||
int protocolParallelJobErrorCode(const ProtocolParallelJob *this);
|
||||
const String *protocolParallelJobErrorMessage(const ProtocolParallelJob *this);
|
||||
void protocolParallelJobErrorSet(ProtocolParallelJob *this, int code, const String *message);
|
||||
|
||||
// Job key
|
||||
const Variant *protocolParallelJobKey(const ProtocolParallelJob *this);
|
||||
|
||||
// Process Id
|
||||
unsigned int protocolParallelJobProcessId(const ProtocolParallelJob *this);
|
||||
void protocolParallelJobProcessIdSet(ProtocolParallelJob *this, unsigned int processId);
|
||||
|
||||
// Job result
|
||||
const Variant *protocolParallelJobResult(const ProtocolParallelJob *this);
|
||||
void protocolParallelJobResultSet(ProtocolParallelJob *this, const Variant *result);
|
||||
|
||||
// Job state
|
||||
ProtocolParallelJobState protocolParallelJobState(const ProtocolParallelJob *this);
|
||||
void protocolParallelJobStateSet(ProtocolParallelJob *this, ProtocolParallelJobState state);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -22,10 +22,8 @@ Object type
|
||||
***********************************************************************************************************************************/
|
||||
struct ProtocolServer
|
||||
{
|
||||
MemContext *memContext;
|
||||
ProtocolServerPub pub; // Publicly accessible variables
|
||||
const String *name;
|
||||
IoRead *read;
|
||||
IoWrite *write;
|
||||
};
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
@ -51,10 +49,13 @@ protocolServerNew(const String *name, const String *service, IoRead *read, IoWri
|
||||
|
||||
*this = (ProtocolServer)
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.pub =
|
||||
{
|
||||
.memContext = memContextCurrent(),
|
||||
.read = read,
|
||||
.write = write,
|
||||
},
|
||||
.name = strDup(name),
|
||||
.read = read,
|
||||
.write = write,
|
||||
};
|
||||
|
||||
// Send the protocol greeting
|
||||
@ -65,8 +66,8 @@ protocolServerNew(const String *name, const String *service, IoRead *read, IoWri
|
||||
kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_SERVICE_STR), VARSTR(service));
|
||||
kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_VERSION_STR), VARSTRZ(PROJECT_VERSION));
|
||||
|
||||
ioWriteStrLine(this->write, jsonFromKv(greetingKv));
|
||||
ioWriteFlush(this->write);
|
||||
ioWriteStrLine(protocolServerIoWrite(this), jsonFromKv(greetingKv));
|
||||
ioWriteFlush(protocolServerIoWrite(this));
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
}
|
||||
@ -96,8 +97,8 @@ protocolServerError(ProtocolServer *this, int code, const String *message, const
|
||||
kvPut(error, VARSTR(PROTOCOL_OUTPUT_STR), VARSTR(message));
|
||||
kvPut(error, VARSTR(PROTOCOL_ERROR_STACK_STR), VARSTR(stack));
|
||||
|
||||
ioWriteStrLine(this->write, jsonFromKv(error));
|
||||
ioWriteFlush(this->write);
|
||||
ioWriteStrLine(protocolServerIoWrite(this), jsonFromKv(error));
|
||||
ioWriteFlush(protocolServerIoWrite(this));
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
@ -129,7 +130,7 @@ protocolServerProcess(
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Read command
|
||||
KeyValue *commandKv = jsonToKv(ioReadLine(this->read));
|
||||
KeyValue *commandKv = jsonToKv(ioReadLine(protocolServerIoRead(this)));
|
||||
const String *command = varStr(kvGet(commandKv, VARSTR(PROTOCOL_KEY_COMMAND_STR)));
|
||||
VariantList *paramList = varVarLst(kvGet(commandKv, VARSTR(PROTOCOL_KEY_PARAMETER_STR)));
|
||||
|
||||
@ -150,7 +151,7 @@ protocolServerProcess(
|
||||
{
|
||||
// Send the command to the handler. Run the handler in the server's memory context in case any persistent data
|
||||
// needs to be stored by the handler.
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
MEM_CONTEXT_BEGIN(this->pub.memContext)
|
||||
{
|
||||
// Initialize retries in case of command failure
|
||||
bool retry = false;
|
||||
@ -244,15 +245,15 @@ protocolServerResponse(ProtocolServer *this, const Variant *output)
|
||||
if (output != NULL)
|
||||
kvAdd(result, VARSTR(PROTOCOL_OUTPUT_STR), output);
|
||||
|
||||
ioWriteStrLine(this->write, jsonFromKv(result));
|
||||
ioWriteFlush(this->write);
|
||||
ioWriteStrLine(protocolServerIoWrite(this), jsonFromKv(result));
|
||||
ioWriteFlush(protocolServerIoWrite(this));
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
void
|
||||
protocolServerWriteLine(const ProtocolServer *this, const String *line)
|
||||
protocolServerWriteLine(ProtocolServer *this, const String *line)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
|
||||
@ -262,44 +263,18 @@ protocolServerWriteLine(const ProtocolServer *this, const String *line)
|
||||
ASSERT(this != NULL);
|
||||
|
||||
// Dot indicates the start of an lf-terminated line
|
||||
ioWrite(this->write, DOT_BUF);
|
||||
ioWrite(protocolServerIoWrite(this), DOT_BUF);
|
||||
|
||||
// Write the line if it exists
|
||||
if (line != NULL)
|
||||
ioWriteStr(this->write, line);
|
||||
ioWriteStr(protocolServerIoWrite(this), line);
|
||||
|
||||
// Terminate with a linefeed
|
||||
ioWrite(this->write, LF_BUF);
|
||||
ioWrite(protocolServerIoWrite(this), LF_BUF);
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
IoRead *
|
||||
protocolServerIoRead(const ProtocolServer *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_SERVER, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->read);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
IoWrite *
|
||||
protocolServerIoWrite(const ProtocolServer *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(PROTOCOL_SERVER, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->write);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
String *
|
||||
protocolServerToLog(const ProtocolServer *this)
|
||||
|
@ -34,6 +34,32 @@ Constructors
|
||||
***********************************************************************************************************************************/
|
||||
ProtocolServer *protocolServerNew(const String *name, const String *service, IoRead *read, IoWrite *write);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct ProtocolServerPub
|
||||
{
|
||||
MemContext *memContext; // Mem context
|
||||
IoRead *read; // Read interface
|
||||
IoWrite *write; // Write interface
|
||||
} ProtocolServerPub;
|
||||
|
||||
// Read interface
|
||||
__attribute__((always_inline)) static inline IoRead *
|
||||
protocolServerIoRead(ProtocolServer *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolServerPub *)this)->read;
|
||||
}
|
||||
|
||||
// Write interface
|
||||
__attribute__((always_inline)) static inline IoWrite *
|
||||
protocolServerIoWrite(ProtocolServer *this)
|
||||
{
|
||||
ASSERT_INLINE(this != NULL);
|
||||
return ((ProtocolServerPub *)this)->write;
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
@ -56,16 +82,7 @@ protocolServerMove(ProtocolServer *this, MemContext *parentNew)
|
||||
}
|
||||
|
||||
// Write a line
|
||||
void protocolServerWriteLine(const ProtocolServer *this, const String *line);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
// Read interface
|
||||
IoRead *protocolServerIoRead(const ProtocolServer *this);
|
||||
|
||||
// Write interface
|
||||
IoWrite *protocolServerIoWrite(const ProtocolServer *this);
|
||||
void protocolServerWriteLine(ProtocolServer *this, const String *line);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
|
@ -252,7 +252,7 @@ testRun(void)
|
||||
memContextFree(memContext);
|
||||
|
||||
// Create bogus client and exec with the freed memcontext to generate errors
|
||||
ProtocolClient client = {.memContext = memContext, .name = STRDEF("test")};
|
||||
ProtocolClient client = {.pub = {.memContext = memContext}, .name = STRDEF("test")};
|
||||
Exec exec = {.memContext = memContext, .name = STRDEF("test"), .command = strNew("test")};
|
||||
ProtocolHelperClient protocolHelperClient = {.client = &client, .exec = &exec};
|
||||
|
||||
@ -578,8 +578,8 @@ testRun(void)
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
TEST_RESULT_PTR(protocolClientIoRead(client), client->read, "get read io");
|
||||
TEST_RESULT_PTR(protocolClientIoWrite(client), client->write, "get write io");
|
||||
TEST_RESULT_PTR(protocolClientIoRead(client), client->pub.read, "get read io");
|
||||
TEST_RESULT_PTR(protocolClientIoWrite(client), client->pub.write, "get write io");
|
||||
|
||||
// Throw errors
|
||||
TEST_ERROR(
|
||||
@ -726,8 +726,8 @@ testRun(void)
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
TEST_RESULT_PTR(protocolServerIoRead(server), server->read, "get read io");
|
||||
TEST_RESULT_PTR(protocolServerIoWrite(server), server->write, "get write io");
|
||||
TEST_RESULT_PTR(protocolServerIoRead(server), server->pub.read, "get read io");
|
||||
TEST_RESULT_PTR(protocolServerIoWrite(server), server->pub.write, "get write io");
|
||||
|
||||
static const ProtocolServerHandler commandHandler[] =
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user