1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Add Exec object.

Executes a child process and allows the calling process to communicate with it using read/write io.

This object is specially tailored to implement the protocol layer and may or may not be generally applicable to general purpose
execution.
This commit is contained in:
David Steele 2019-01-18 11:45:40 +02:00
parent 797f8098d1
commit 9cac403f61
12 changed files with 526 additions and 1 deletions

View File

@ -95,6 +95,9 @@ kernel: 100
# An error from a service that is not our fault, e.g. 5xx errors from an http server. These may be retried.
service: 101
# An error while attempting to execute a binary
execute: 102
# This error should not be thrown directly -- it serves as a parent for the C errors
runtime: 122

View File

@ -29,6 +29,10 @@
</release-improvement-list>
<release-development-list>
<release-item>
<p>Add <code>Exec</code> object.</p>
</release-item>
<release-item>
<p>Add <code>IoHandleRead</code> and <code>IoHandleWrite</code> objects.</p>
</release-item>

View File

@ -171,6 +171,8 @@ use constant ERROR_KERNEL => 100;
push @EXPORT, qw(ERROR_KERNEL);
use constant ERROR_SERVICE => 101;
push @EXPORT, qw(ERROR_SERVICE);
use constant ERROR_EXECUTE => 102;
push @EXPORT, qw(ERROR_EXECUTE);
use constant ERROR_RUNTIME => 122;
push @EXPORT, qw(ERROR_RUNTIME);
use constant ERROR_INVALID => 123;

View File

@ -69,6 +69,7 @@ SRCS = \
common/encode.c \
common/encode/base64.c \
common/error.c \
common/exec.c \
common/exit.c \
common/fork.c \
common/io/bufferRead.c \
@ -110,6 +111,7 @@ SRCS = \
compress/gzipDecompress.c \
config/config.c \
config/define.c \
config/exec.c \
config/load.c \
config/parse.c \
crypto/cipherBlock.c \
@ -202,6 +204,9 @@ common/encode/base64.o: common/encode/base64.c common/debug.h common/encode/base
common/error.o: common/error.c common/error.auto.c common/error.auto.h common/error.h common/logLevel.h common/stackTrace.h
$(CC) $(CFLAGS) -c common/error.c -o common/error.o
common/exec.o: common/exec.c common/assert.h common/debug.h common/error.auto.h common/error.h common/exec.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/handleWrite.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/wait.h
$(CC) $(CFLAGS) -c common/exec.c -o common/exec.o
common/exit.o: common/exit.c command/command.h common/debug.h common/error.auto.h common/error.h common/exit.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h perl/exec.h
$(CC) $(CFLAGS) -c common/exit.c -o common/exit.o

View File

@ -83,6 +83,7 @@ ERROR_DEFINE( 98, FileInfoError, RuntimeError);
ERROR_DEFINE( 99, JsonFormatError, RuntimeError);
ERROR_DEFINE(100, KernelError, RuntimeError);
ERROR_DEFINE(101, ServiceError, RuntimeError);
ERROR_DEFINE(102, ExecuteError, RuntimeError);
ERROR_DEFINE(122, RuntimeError, RuntimeError);
ERROR_DEFINE(123, InvalidError, RuntimeError);
ERROR_DEFINE(124, UnhandledError, RuntimeError);
@ -169,6 +170,7 @@ static const ErrorType *errorTypeList[] =
&JsonFormatError,
&KernelError,
&ServiceError,
&ExecuteError,
&RuntimeError,
&InvalidError,
&UnhandledError,

View File

@ -85,6 +85,7 @@ ERROR_DECLARE(FileInfoError);
ERROR_DECLARE(JsonFormatError);
ERROR_DECLARE(KernelError);
ERROR_DECLARE(ServiceError);
ERROR_DECLARE(ExecuteError);
ERROR_DECLARE(RuntimeError);
ERROR_DECLARE(InvalidError);
ERROR_DECLARE(UnhandledError);

368
src/common/exec.c Normal file
View File

@ -0,0 +1,368 @@
/***********************************************************************************************************************************
Execute Process
***********************************************************************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "common/assert.h"
#include "common/debug.h"
#include "common/log.h"
#include "common/exec.h"
#include "common/io/handleRead.h"
#include "common/io/handleWrite.h"
#include "common/io/io.h"
#include "common/io/read.intern.h"
#include "common/io/write.intern.h"
#include "common/wait.h"
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct Exec
{
MemContext *memContext; // Mem context
String *command; // Command to execute
StringList *param; // List of parameters to pass to command
const String *name; // Name to display in log/error messages
TimeMSec timeout; // Timeout for any i/o operation (read, write, etc.)
pid_t processId; // Process id of the child process
int handleRead; // Read handle
int handleWrite; // Write handle
int handleError; // Error handle
IoHandleRead *ioReadHandle; // Handle read driver
IoWrite *ioWriteHandle; // Handle write interface
IoRead *ioReadExec; // Wrapper for handle read interface
IoWrite *ioWriteExec; // Wrapper for handle write interface
};
/***********************************************************************************************************************************
New object
***********************************************************************************************************************************/
Exec *
execNew(const String *command, const StringList *param, const String *name, TimeMSec timeout)
{
FUNCTION_DEBUG_BEGIN(logLevelDebug)
FUNCTION_DEBUG_PARAM(STRING, command);
FUNCTION_DEBUG_PARAM(STRING_LIST, param);
FUNCTION_DEBUG_PARAM(STRING, name);
FUNCTION_DEBUG_PARAM(TIME_MSEC, timeout);
FUNCTION_TEST_ASSERT(command != NULL);
FUNCTION_TEST_ASSERT(name != NULL);
FUNCTION_TEST_ASSERT(timeout > 0);
FUNCTION_DEBUG_END();
Exec *this = NULL;
MEM_CONTEXT_NEW_BEGIN("Exec")
{
this = memNew(sizeof(Exec));
this->memContext = MEM_CONTEXT_NEW();
this->command = strDup(command);
// Parameter list is optional but if not specified we need to build one with the command
if (param == NULL)
this->param = strLstNew();
else
this->param = strLstDup(param);
// The first parameter must be the command
strLstInsert(this->param, 0, this->command);
this->name = strDup(name);
this->timeout = timeout;
}
MEM_CONTEXT_NEW_END();
FUNCTION_DEBUG_RESULT(EXEC, this);
}
/***********************************************************************************************************************************
Execute command
***********************************************************************************************************************************/
void
execOpen(Exec *this)
{
FUNCTION_DEBUG_BEGIN(logLevelDebug)
FUNCTION_DEBUG_PARAM(EXEC, this);
FUNCTION_TEST_ASSERT(this != NULL);
FUNCTION_DEBUG_END();
// Create pipes to communicate with the subprocess. The names of the pipes are from the perspective of the parent process since
// the child process will use them only briefly before exec'ing.
int pipeRead[2];
int pipeWrite[2];
int pipeError[2];
THROW_ON_SYS_ERROR(pipe(pipeRead) == -1, KernelError, "unable to create read pipe");
THROW_ON_SYS_ERROR(pipe(pipeWrite) == -1, KernelError, "unable to create write pipe");
THROW_ON_SYS_ERROR(pipe(pipeError) == -1, KernelError, "unable to create write pipe");
// Fork the subprocess
this->processId = fork();
// Exec command in the child process
if (this->processId == 0)
{
// Disable logging and close log file. The new process will reinitialize logging if needed.
logInit(logLevelOff, logLevelOff, logLevelOff, false);
// Assign stdout to the input side of the read pipe and close the unused handle
dup2(pipeRead[1], STDOUT_FILENO);
close(pipeRead[0]);
// Assign stdin to the output side of the write pipe and close the unused handle
dup2(pipeWrite[0], STDIN_FILENO);
close(pipeWrite[1]);
// Assign stderr to the input side of the error pipe and close the unused handle
dup2(pipeError[1], STDERR_FILENO);
close(pipeError[0]);
// Execute the binary. This statement will not return if it is successful
execvp(strPtr(this->command), (char ** const)strLstPtr(this->param));
// If we got here then there was an error. We can't use a throw as we normally would because we have already shutdown
// logging and we don't want to execute exit paths that might free parent resources which we still have references to.
fprintf(stderr, "unable to execute '%s': [%d] %s\n", strPtr(this->command), errno, strerror(errno));
exit(errorTypeCode(&ExecuteError));
}
// Close the unused handles
close(pipeRead[1]);
close(pipeWrite[0]);
close(pipeError[1]);
// Store the handles we'll use and need to close when the process terminates
this->handleRead = pipeRead[0];
this->handleWrite = pipeWrite[1];
this->handleError = pipeError[0];
// Assign handles to io interfaces
this->ioReadHandle = ioHandleReadNew(strNewFmt("%s read", strPtr(this->name)), this->handleRead, this->timeout);
this->ioWriteHandle = ioHandleWriteIo(ioHandleWriteNew(strNewFmt("%s write", strPtr(this->name)), this->handleWrite));
ioWriteOpen(this->ioWriteHandle);
// Create wrapper interfaces that check process state
this->ioReadExec = ioReadNewP(this, .read = (IoReadInterfaceRead)execRead, .eof = (IoReadInterfaceEof)execEof);
ioReadOpen(this->ioReadExec);
this->ioWriteExec = ioWriteNewP(this, .write = (IoWriteInterfaceWrite)execWrite);
ioWriteOpen(this->ioWriteExec);
// Set a callback so the handles will get freed
memContextCallback(this->memContext, (MemContextCallback)execFree, this);
FUNCTION_DEBUG_RESULT_VOID();
}
/***********************************************************************************************************************************
Check if the process is still running
This should be called when anything unexpected happens while reading or writing, including errors and eof. If this function returns
then the original error should be rethrown.
***********************************************************************************************************************************/
static void
execCheck(Exec *this)
{
FUNCTION_DEBUG_BEGIN(logLevelTrace);
FUNCTION_DEBUG_PARAM(EXEC, this);
FUNCTION_DEBUG_ASSERT(this != NULL);
FUNCTION_DEBUG_END();
int processStatus;
int processResult;
THROW_ON_SYS_ERROR(
(processResult = waitpid(this->processId, &processStatus, WNOHANG)) == -1, ExecuteError, "unable to wait on child process");
if (processResult != 0)
{
// Clear the process id so we don't try to wait for this process on free
this->processId = 0;
// If the process exited normally
if (WIFEXITED(processStatus))
{
// Get data from stderr to help diagnose the problem
IoRead *ioReadError = ioHandleReadIo(ioHandleReadNew(strNewFmt("%s error", strPtr(this->name)), this->handleError, 0));
ioReadOpen(ioReadError);
String *errorStr = strTrim(strNewBuf(ioReadBuf(ioReadError)));
// Throw the error with as much information as is available
THROWP_FMT(
errorTypeFromCode(WEXITSTATUS(processStatus)), "%s terminated unexpectedly [%d]%s%s", strPtr(this->name),
WEXITSTATUS(processStatus), strSize(errorStr) > 0 ? ": " : "", strSize(errorStr) > 0 ? strPtr(errorStr) : "");
}
// If the process did not exit normally then it must have been a signal
THROW_FMT(ExecuteError, "%s terminated unexpectedly on signal %d", strPtr(this->name), WTERMSIG(processStatus));
}
FUNCTION_DEBUG_RESULT_VOID();
}
/***********************************************************************************************************************************
Read from the process
***********************************************************************************************************************************/
void
execRead(Exec *this, Buffer *buffer, bool block)
{
FUNCTION_DEBUG_BEGIN(logLevelTrace);
FUNCTION_DEBUG_PARAM(EXEC, this);
FUNCTION_DEBUG_PARAM(BUFFER, buffer);
FUNCTION_DEBUG_PARAM(BOOL, block);
FUNCTION_DEBUG_ASSERT(this != NULL);
FUNCTION_DEBUG_ASSERT(buffer != NULL);
FUNCTION_DEBUG_END();
TRY_BEGIN()
{
ioHandleRead(this->ioReadHandle, buffer, block);
}
CATCH_ANY()
{
execCheck(this);
RETHROW();
}
TRY_END();
FUNCTION_DEBUG_RESULT_VOID();
}
/***********************************************************************************************************************************
Write to the process
***********************************************************************************************************************************/
void
execWrite(Exec *this, Buffer *buffer)
{
FUNCTION_DEBUG_BEGIN(logLevelTrace);
FUNCTION_DEBUG_PARAM(EXEC, this);
FUNCTION_DEBUG_PARAM(BUFFER, buffer);
FUNCTION_DEBUG_ASSERT(this != NULL);
FUNCTION_DEBUG_ASSERT(buffer != NULL);
FUNCTION_DEBUG_END();
TRY_BEGIN()
{
ioWrite(this->ioWriteHandle, buffer);
ioWriteFlush(this->ioWriteHandle);
}
CATCH_ANY()
{
execCheck(this);
RETHROW();
}
TRY_END();
FUNCTION_DEBUG_RESULT_VOID();
}
/***********************************************************************************************************************************
Is the process eof?
***********************************************************************************************************************************/
bool
execEof(Exec *this)
{
FUNCTION_DEBUG_BEGIN(logLevelTrace);
FUNCTION_DEBUG_PARAM(EXEC, this);
FUNCTION_DEBUG_ASSERT(this != NULL);
FUNCTION_DEBUG_END();
// Check that the process is still running on eof
if (ioHandleReadEof(this->ioReadHandle))
execCheck(this);
FUNCTION_DEBUG_RESULT(BOOL, false);
}
/***********************************************************************************************************************************
Get read interface
***********************************************************************************************************************************/
IoRead *
execIoRead(const Exec *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(EXEC, this);
FUNCTION_TEST_ASSERT(this != NULL);
FUNCTION_TEST_END();
FUNCTION_TEST_RESULT(IO_READ, this->ioReadExec);
}
/***********************************************************************************************************************************
Get write interface
***********************************************************************************************************************************/
IoWrite *
execIoWrite(const Exec *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(EXEC, this);
FUNCTION_TEST_ASSERT(this != NULL);
FUNCTION_TEST_END();
FUNCTION_TEST_RESULT(IO_WRITE, this->ioWriteExec);
}
/***********************************************************************************************************************************
Free the object
***********************************************************************************************************************************/
void
execFree(Exec *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(EXEC, this);
FUNCTION_TEST_END();
if (this != NULL)
{
// Close the io handles
close(this->handleRead);
close(this->handleWrite);
close(this->handleError);
// Wait for the child to exit. We don't really care how it exits as long as it does.
if (this->processId != 0)
{
MEM_CONTEXT_TEMP_BEGIN()
{
int processResult = 0;
Wait *wait = waitNew(this->timeout);
do
{
THROW_ON_SYS_ERROR(
(processResult = waitpid(this->processId, NULL, WNOHANG)) == -1, ExecuteError,
"unable to wait on child process");
}
while (processResult == 0 && waitMore(wait));
// If the process did not exit then error -- else we may end up with a collection of zombie processes
if (processResult == 0)
THROW_FMT(ExecuteError, "%s did not exit when expected", strPtr(this->name));
}
MEM_CONTEXT_TEMP_END();
}
// Free mem context
memContextCallbackClear(this->memContext);
memContextFree(this->memContext);
}
FUNCTION_TEST_RESULT_VOID();
}

54
src/common/exec.h Normal file
View File

@ -0,0 +1,54 @@
/***********************************************************************************************************************************
Execute Process
Executes a child process and allows the calling process to communicate with it using read/write io.
This object is specially tailored to implement the protocol layer and may or may not be generally applicable to general purpose
execution.
***********************************************************************************************************************************/
#ifndef COMMON_EXEC_H
#define COMMON_EXEC_H
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
typedef struct Exec Exec;
#include "common/io/read.h"
#include "common/io/write.h"
#include "common/time.h"
#include "common/type/stringList.h"
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
Exec *execNew(const String *command, const StringList *param, const String *name, TimeMSec timeout);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
void execOpen(Exec *this);
void execRead(Exec *this, Buffer *buffer, bool block);
void execWrite(Exec *this, Buffer *buffer);
/***********************************************************************************************************************************
Getters
***********************************************************************************************************************************/
bool execEof(Exec *this);
IoRead *execIoRead(const Exec *this);
IoWrite *execIoWrite(const Exec *this);
/***********************************************************************************************************************************
Destructor
***********************************************************************************************************************************/
void execFree(Exec *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
#define FUNCTION_DEBUG_EXEC_TYPE \
Exec *
#define FUNCTION_DEBUG_EXEC_FORMAT(value, buffer, bufferSize) \
objToLog(value, "Exec", buffer, bufferSize)
#endif

View File

@ -88,7 +88,7 @@ ioHandleRead(IoHandleRead *this, Buffer *buffer, bool block)
// Determine if there is data to be read
int result = select(this->handle + 1, &selectSet, NULL, NULL, &timeoutSelect);
THROW_ON_SYS_ERROR_FMT(result == -1, AssertError, "unable to select from %s", strPtr(this->name));
THROW_ON_SYS_ERROR_FMT(result == -1, FileReadError, "unable to select from %s", strPtr(this->name));
// If no data read after time allotted then error
if (!result)

View File

@ -5227,6 +5227,8 @@ static const EmbeddedModule embeddedModule[] =
"push @EXPORT, qw(ERROR_KERNEL);\n"
"use constant ERROR_SERVICE => 101;\n"
"push @EXPORT, qw(ERROR_SERVICE);\n"
"use constant ERROR_EXECUTE => 102;\n"
"push @EXPORT, qw(ERROR_EXECUTE);\n"
"use constant ERROR_RUNTIME => 122;\n"
"push @EXPORT, qw(ERROR_RUNTIME);\n"
"use constant ERROR_INVALID => 123;\n"

View File

@ -231,6 +231,13 @@ unit:
common/io/http/header: full
common/io/http/query: full
# ----------------------------------------------------------------------------------------------------------------------------
- name: exec
total: 1
coverage:
common/exec: full
# ----------------------------------------------------------------------------------------------------------------------------
- name: encode
total: 1

View File

@ -0,0 +1,77 @@
/***********************************************************************************************************************************
Execute Process
***********************************************************************************************************************************/
/***********************************************************************************************************************************
Test Run
***********************************************************************************************************************************/
void
testRun(void)
{
FUNCTION_HARNESS_VOID();
// *****************************************************************************************************************************
if (testBegin("Exec"))
{
Exec *exec = NULL;
TEST_ASSIGN(exec, execNew(strNew("catt"), NULL, strNew("cat"), 1000), "invalid exec");
TEST_RESULT_VOID(execOpen(exec), "open invalid exec");
TEST_RESULT_VOID(ioWriteLine(execIoWrite(exec), EMPTY_STR), "write invalid exec");
sleep(1);
TEST_ERROR(
ioWriteFlush(execIoWrite(exec)), ExecuteError,
"cat terminated unexpectedly [102]: unable to execute 'catt': [2] No such file or directory");
TEST_RESULT_VOID(execFree(exec), "free exec");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(exec, execNew(strNew("cat"), NULL, strNew("cat"), 1000), "new cat exec");
TEST_RESULT_VOID(execOpen(exec), "open cat exec");
String *message = strNew("ACKBYACK");
TEST_RESULT_VOID(ioWriteLine(execIoWrite(exec), message), "write cat exec");
ioWriteFlush(execIoWrite(exec));
TEST_RESULT_STR(strPtr(ioReadLine(execIoRead(exec))), strPtr(message), "read cat exec");
TEST_RESULT_VOID(execFree(exec), "free exec");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(exec, execNew(strNew("cat"), NULL, strNew("cat"), 1000), "new cat exec");
TEST_RESULT_VOID(execOpen(exec), "open cat exec");
close(exec->handleWrite);
TEST_ERROR(strPtr(ioReadLine(execIoRead(exec))), UnknownError, "cat terminated unexpectedly [0]");
TEST_RESULT_VOID(execFree(exec), "free exec");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(exec, execNew(strNew("cat"), NULL, strNew("cat"), 1000), "new cat exec");
TEST_RESULT_VOID(execOpen(exec), "open cat exec");
kill(exec->processId, SIGKILL);
TEST_ERROR(strPtr(ioReadLine(execIoRead(exec))), ExecuteError, "cat terminated unexpectedly on signal 9");
TEST_RESULT_VOID(execFree(exec), "free exec");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(exec, execNew(strNew("cat"), strLstAddZ(strLstNew(), "-b"), strNew("cat"), 1000), "new cat exec");
TEST_RESULT_VOID(execOpen(exec), "open cat exec");
TEST_RESULT_VOID(ioWriteLine(execIoWrite(exec), message), "write cat exec");
ioWriteFlush(execIoWrite(exec));
TEST_RESULT_STR(strPtr(ioReadLine(execIoRead(exec))), " 1\tACKBYACK", "read cat exec");
TEST_RESULT_VOID(execFree(exec), "free exec");
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(exec, execNew(strNew("sleep"), strLstAddZ(strLstNew(), "2"), strNew("sleep"), 1000), "new sleep exec");
TEST_RESULT_VOID(execOpen(exec), "open cat exec");
TEST_ERROR(execFree(exec), ExecuteError, "sleep did not exit when expected");
TEST_ERROR(ioReadLine(execIoRead(exec)), FileReadError, "unable to select from sleep read: [9] Bad file descriptor");
ioWriteLine(execIoWrite(exec), strNew(""));
TEST_ERROR(ioWriteFlush(execIoWrite(exec)), FileWriteError, "unable to write to sleep write: [9] Bad file descriptor");
TEST_RESULT_VOID(execFree(exec), "sleep exited as expected");
TEST_RESULT_VOID(execFree(NULL), "free null exec");
}
FUNCTION_HARNESS_RESULT_VOID();
}