1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-20 04:59:25 +02:00

The archive-push command is now partially coded in C.

This allows the PostgreSQL archive_command to run significantly faster when processing status messages from the asynchronous archive process.
This commit is contained in:
David Steele 2018-01-17 15:52:00 -05:00
parent a4c058d070
commit 2cc9b2287b
12 changed files with 450 additions and 252 deletions

View File

@ -13,6 +13,10 @@
<release date="XXXX-XX-XX" version="2.00dev" title="UNDER DEVELOPMENT">
<release-core-list>
<release-feature-list>
<release-item>
<p>The <cmd>archive-push</cmd> command is now partially coded in C which allows the <postgres/> <file>archive_command</file> to run significantly faster when processing status messages from the asynchronous archive process.</p>
</release-item>
<release-item>
<release-item-contributor-list>
<release-item-contributor id="shang.cynthia"/>

View File

@ -286,8 +286,8 @@ sub processQueue
my $iCode = exceptionCode($EVAL_ERROR);
my $strMessage = exceptionMessage($EVAL_ERROR);
# Error all ready jobs
foreach my $strWalFile (@{$self->readyList()})
# Error all queued jobs
foreach my $strWalFile (@{$stryWalFile})
{
$self->walStatusWrite(
WAL_STATUS_ERROR, $strWalFile, $iCode, $strMessage);

View File

@ -71,44 +71,13 @@ sub process
my $strWalPath = dirname(walPath($strWalPathFile, cfgOption(CFGOPT_DB_PATH, false), cfgCommandName(cfgCommandGet())));
my $strWalFile = basename($strWalPathFile);
# Is the async client or server?
my $bClient = true;
# Start the async process and wait for WAL to complete
if (cfgOption(CFGOPT_ARCHIVE_ASYNC))
{
# Get the spool path
$self->{strSpoolPath} = storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_OUT);
# Loop to check for status files and launch async process
my $bPushed = false;
my $oWait = waitInit(cfgOption(CFGOPT_ARCHIVE_TIMEOUT));
$self->{bConfessOnError} = false;
do
{
# Check WAL status
$bPushed = $self->walStatus($self->{strSpoolPath}, $strWalFile, $self->{bConfessOnError});
# If not found then launch async process
if (!$bPushed)
{
# Load module dynamically
require pgBackRest::Archive::Push::Async;
$bClient = (new pgBackRest::Archive::Push::Async(
$strWalPath, $self->{strSpoolPath}, $self->{strBackRestBin}))->process();
}
$self->{bConfessOnError} = true;
}
while ($bClient && !$bPushed && waitMore($oWait));
if (!$bPushed && $bClient)
{
confess &log(ERROR,
"unable to push WAL ${strWalFile} asynchronously after " . cfgOption(CFGOPT_ARCHIVE_TIMEOUT) . " second(s)",
ERROR_ARCHIVE_TIMEOUT);
}
# Load module dynamically
require pgBackRest::Archive::Push::Async;
(new pgBackRest::Archive::Push::Async(
$strWalPath, storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_OUT), $self->{strBackRestBin}))->process();
}
# Else push synchronously
else
@ -129,15 +98,10 @@ sub process
else
{
archivePushFile($strWalPath, $strWalFile, cfgOption(CFGOPT_COMPRESS), cfgOption(CFGOPT_COMPRESS_LEVEL));
&log(INFO, "pushed WAL segment ${strWalFile}");
}
}
# Only print the message if this is the async client or the WAL file was pushed synchronously
if ($bClient)
{
&log(INFO, "pushed WAL segment ${strWalFile}" . (cfgOption(CFGOPT_ARCHIVE_ASYNC) ? ' asynchronously' : ''));
}
# Return from function and log return values if any
return logDebugReturn
(
@ -146,106 +110,6 @@ sub process
);
}
####################################################################################################################################
# walStatus
#
# Read a WAL status file and return success or raise a warning or error.
####################################################################################################################################
sub walStatus
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my
(
$strOperation,
$strSpoolPath,
$strWalFile,
$bConfessOnError,
) =
logDebugParam
(
__PACKAGE__ . '->walStatus', \@_,
{name => 'strSpoolPath'},
{name => 'strWalFile'},
{name => 'bConfessOnError', default => true},
);
# Default result is false
my $bResult = false;
# Find matching status files
my @stryStatusFile = storageSpool()->list(
$strSpoolPath, {strExpression => '^' . $strWalFile . '\.(ok|error)$', bIgnoreMissing => true});
if (@stryStatusFile > 0)
{
# If more than one status file was found then assert - this could be a bug in the async process
if (@stryStatusFile > 1)
{
confess &log(ASSERT,
"multiple status files found in ${strSpoolPath} for ${strWalFile}: " . join(', ', @stryStatusFile));
}
# Read the status file
my $rstrWalStatus = storageSpool()->get("${strSpoolPath}/$stryStatusFile[0]");
my @stryWalStatus = split("\n", defined($$rstrWalStatus) ? $$rstrWalStatus : '');
# Status file must have at least two lines if it has content
my $iCode;
my $strMessage;
# Parse status content
if (@stryWalStatus != 0)
{
if (@stryWalStatus < 2)
{
confess &log(ASSERT, "$stryStatusFile[0] content must have at least two lines:\n" . join("\n", @stryWalStatus));
}
$iCode = shift(@stryWalStatus);
$strMessage = join("\n", @stryWalStatus);
}
# Process ok files
if ($stryStatusFile[0] =~ /\.ok$/)
{
# If there is content in the status file it is a warning
if (@stryWalStatus != 0)
{
# If error code is not success, then this was a renamed .error file
if ($iCode != 0)
{
$strMessage =
"WAL segment ${strWalFile} was not pushed due to error and was manually skipped:\n" . $strMessage;
}
&log(WARN, $strMessage);
}
$bResult = true;
}
# Process error files
elsif ($bConfessOnError)
{
# Error files must have content
if (@stryWalStatus == 0)
{
confess &log(ASSERT, "$stryStatusFile[0] has no content");
}
confess &log(ERROR, $strMessage, $iCode);
}
}
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'bResult', value => $bResult}
);
}
####################################################################################################################################
# readyList
#

View File

@ -3,6 +3,7 @@ CFLAGS=-I. -Wfatal-errors -Wall -Wextra -Wwrite-strings -Wno-clobbered -std=c99
DESTDIR=
pgbackrest: \
command/archive/push/push.o \
command/command.o \
common/error.o \
common/errorType.o \
@ -29,6 +30,7 @@ pgbackrest: \
storage/storage.o \
main.o
$(CC) $(CFLAGS) -o pgbackrest \
command/archive/push/push.o \
command/command.o \
common/error.o \
common/errorType.o \

View File

@ -0,0 +1,186 @@
/***********************************************************************************************************************************
Archive Push Command
***********************************************************************************************************************************/
#include <libgen.h>
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "common/error.h"
#include "common/log.h"
#include "common/memContext.h"
#include "common/regExp.h"
#include "common/type.h"
#include "common/wait.h"
#include "config/config.h"
#include "perl/exec.h"
#include "storage/helper.h"
/***********************************************************************************************************************************
Check for ok/error status files in the spool out directory
***********************************************************************************************************************************/
static bool
walStatus(const String *walSegment, bool confessOnError)
{
bool result = false;
MEM_CONTEXT_TEMP_BEGIN()
{
StringList *fileList = storageList(
storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT), strNewFmt("^%s\\.(ok|error)$", strPtr(walSegment)), true);
if (fileList != NULL && strLstSize(fileList) > 0)
{
// If more than one status file was found then assert - this could be a bug in the async process
if (strLstSize(fileList) != 1)
{
THROW(
AssertError, "multiple status files found in '%s' for WAL segment '%s'",
strPtr(storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT))), strPtr(walSegment));
}
// Get the status file content
const String *statusFile = strLstGet(fileList, 0);
String *content = strNewBuf(
storageGet(storageSpool(), strNewFmt("%s/%s", STORAGE_SPOOL_ARCHIVE_OUT, strPtr(statusFile)), false));
// Get the code and message if the file has content
int code = 0;
const String *message = NULL;
if (strSize(content) != 0)
{
// Find the line feed after the error code -- should be the first one
const char *linefeedPtr = strchr(strPtr(content), '\n');
// Error if linefeed not found
if (linefeedPtr == NULL)
THROW(FormatError, "%s content must have at least two lines", strPtr(statusFile));
// Error if message is zero-length
if (strlen(linefeedPtr + 1) == 0)
THROW(FormatError, "%s message must be > 0", strPtr(statusFile));
// Get contents
code = varIntForce(varNewStr(strNewN(strPtr(content), linefeedPtr - strPtr(content))));
message = strTrim(strNew(linefeedPtr + 1));
}
// Process OK files
if (strEndsWithZ(statusFile, ".ok"))
{
// If there is content in the status file it is a warning
if (strSize(content) != 0)
{
// If error code is not success, then this was a renamed .error file
if (code != 0)
{
message = strNewFmt(
"WAL segment '%s' was not pushed due to error [%d] and was manually skipped: %s", strPtr(walSegment),
code, strPtr(message));
}
LOG_WARN(strPtr(message));
}
result = true;
}
else if (confessOnError)
{
// Error status files must have content
if (strSize(content) == 0)
THROW(AssertError, "status file '%s' has no content", strPtr(statusFile));
// Throw error using the code passed in the file
THROW_CODE(code, strPtr(message));
}
}
}
MEM_CONTEXT_TEMP_END();
return result;
}
/***********************************************************************************************************************************
Push a WAL segment to the repository
***********************************************************************************************************************************/
void
cmdArchivePush()
{
MEM_CONTEXT_TEMP_BEGIN()
{
// Make sure there is a parameter to retrieve the WAL segment from
const StringList *commandParam = cfgCommandParam();
if (strLstSize(commandParam) != 1)
THROW(ParamRequiredError, "WAL segment to push required");
// Get the segment name
String *walSegment = strBase(strLstGet(commandParam, 0));
if (cfgOptionBool(cfgOptArchiveAsync))
{
bool pushed = false; // Has the WAL segment been pushed yet?
bool confessOnError = false; // Should we confess errors?
// Loop and wait for the WAL segment to be pushed
Wait *wait = waitNew(cfgOptionDbl(cfgOptArchiveTimeout));
do
{
// Check if the WAL segment has been pushed. Errors will not be confessed on the first try to allow the async
// process a chance to fix them.
pushed = walStatus(walSegment, confessOnError);
// If the WAL segment has not already been pushed then start the async process to push it
if (!pushed)
{
// Only want to see warnings and errors from async process
cfgOptionSet(cfgOptLogLevelConsole, cfgSourceParam, varNewStrZ("warn"));
// Async process is currently implemented in Perl
int processId = 0;
if ((processId = fork()) == 0)
{
perlExec(perlCommand());
}
// Wait for async process to exit (this should happen quickly) and report any errors
else
{
int processStatus;
THROW_ON_SYS_ERROR(
waitpid(processId, &processStatus, 0) != processId, AssertError, "unable to find perl child process");
if (WEXITSTATUS(processStatus) != 0)
THROW(AssertError, "perl exited with error %d", WEXITSTATUS(processStatus));
}
}
// Now that the async process has been launched, confess any errors that are found
confessOnError = true;
}
while (!pushed && waitMore(wait));
waitFree(wait);
// If the WAL segment was not pushed then error
if (!pushed)
{
THROW(
ArchiveTimeoutError, "unable to push WAL segment '%s' asynchronously after %lg second(s)", strPtr(walSegment),
cfgOptionDbl(cfgOptArchiveTimeout));
}
// Log success
LOG_INFO("pushed WAL segment %s asynchronously", strPtr(walSegment));
}
else
THROW(AssertError, "archive-push in C does not support synchronous mode");
}
MEM_CONTEXT_TEMP_END();
}

View File

@ -0,0 +1,12 @@
/***********************************************************************************************************************************
Archive Push Command
***********************************************************************************************************************************/
#ifndef COMMAND_ARCHIVE_PUSH_H
#define COMMAND_ARCHIVE_PUSH_H
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
void cmdArchivePush();
#endif

View File

@ -37,9 +37,12 @@ ERROR_DEFINE(ERROR_CODE_MIN + 07, OptionInvalidValueError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 12, OptionRequiredError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 16, FileOpenError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 17, FileReadError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 18, ParamRequiredError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 19, ArchiveMismatchError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 23, CommandInvalidError, FormatError);
ERROR_DEFINE(ERROR_CODE_MIN + 28, PathOpenError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 39, FileWriteError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 57, ArchiveTimeoutError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 69, MemoryError, RuntimeError);
ERROR_DEFINE(ERROR_CODE_MIN + 70, CipherError, FormatError);
@ -59,9 +62,12 @@ static const ErrorType *errorTypeList[] =
&OptionRequiredError,
&FileOpenError,
&FileReadError,
&ParamRequiredError,
&ArchiveMismatchError,
&CommandInvalidError,
&PathOpenError,
&FileWriteError,
&ArchiveTimeoutError,
&MemoryError,
&CipherError,

View File

@ -23,9 +23,12 @@ ERROR_DECLARE(OptionInvalidValueError);
ERROR_DECLARE(OptionRequiredError);
ERROR_DECLARE(FileOpenError);
ERROR_DECLARE(FileReadError);
ERROR_DECLARE(ParamRequiredError);
ERROR_DECLARE(ArchiveMismatchError);
ERROR_DECLARE(CommandInvalidError);
ERROR_DECLARE(PathOpenError);
ERROR_DECLARE(FileWriteError);
ERROR_DECLARE(ArchiveTimeoutError);
ERROR_DECLARE(MemoryError);
ERROR_DECLARE(CipherError);

View File

@ -4,6 +4,8 @@ Main
#include <stdio.h>
#include <stdlib.h>
#include "command/archive/push/push.h"
#include "command/command.h"
#include "common/error.h"
#include "common/exit.h"
#include "config/config.h"
@ -28,6 +30,15 @@ int main(int argListSize, const char *argList[])
exit(0);
}
// Archive push command. Currently only implements to local operations of async archive push.
// -------------------------------------------------------------------------------------------------------------------------
if (cfgCommand() == cfgCmdArchivePush && cfgOptionBool(cfgOptArchiveAsync))
{
cmdBegin();
cmdArchivePush();
exit(exitSafe(false));
}
// Execute Perl for commands not implemented in C
// -------------------------------------------------------------------------------------------------------------------------
perlExec(perlCommand());

View File

@ -691,6 +691,16 @@ my $oTestDef =
'Protocol/Local/Minion' => TESTDEF_COVERAGE_PARTIAL,
},
},
{
&TESTDEF_NAME => 'push',
&TESTDEF_TOTAL => 2,
&TESTDEF_C => true,
&TESTDEF_COVERAGE =>
{
'command/archive/push/push' => TESTDEF_COVERAGE_FULL,
},
},
{
&TESTDEF_NAME => 'stop',
&TESTDEF_TOTAL => 7,

View File

@ -22,6 +22,7 @@ use pgBackRest::Archive::Push::File;
use pgBackRest::Common::Exception;
use pgBackRest::Common::Lock;
use pgBackRest::Common::Log;
use pgBackRest::Common::Wait;
use pgBackRest::Config::Config;
use pgBackRest::DbVersion;
use pgBackRest::Protocol::Helper;
@ -323,7 +324,7 @@ sub run
}
################################################################################################################################
if ($self->begin("ArchivePushAsync->walStatusWrite() & ArchivePush->walStatus()"))
if ($self->begin("ArchivePushAsync->walStatusWrite()"))
{
my $oPush = new pgBackRest::Archive::Push::Push();
@ -338,32 +339,13 @@ sub run
#---------------------------------------------------------------------------------------------------------------------------
my $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
$self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 0, "${strSegment} WAL no status");
#---------------------------------------------------------------------------------------------------------------------------
# Generate a normal ok
$oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment);
# Check status
$self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 1, "${strSegment} WAL ok");
#---------------------------------------------------------------------------------------------------------------------------
# Generate a bogus warning ok (if content is present there must be two lines)
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
storageTest()->put("$self->{strSpoolPath}/${strSegment}.ok", "Test Warning");
# Check status
$self->testException(
sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, ERROR_ASSERT,
"${strSegment}.ok content must have at least two lines:\nTest Warning");
#---------------------------------------------------------------------------------------------------------------------------
# Generate a valid warning ok
$oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment, 0, 'Test Warning');
# Check status
$self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 1, "${strSegment} WAL warning ok");
#---------------------------------------------------------------------------------------------------------------------------
# Generate an invalid error
$self->testException(
@ -376,53 +358,10 @@ sub run
sub {$oPushAsync->walStatusWrite(WAL_STATUS_ERROR, $strSegment, ERROR_ASSERT)}, ERROR_ASSERT,
"strMessage must be set when iCode is set");
#---------------------------------------------------------------------------------------------------------------------------
# Generate an invalid error
storageTest()->put("$self->{strSpoolPath}/${strSegment}.error");
# Check status (will error because there are now two status files)
$self->testException(
sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, ERROR_ASSERT,
"multiple status files found in " . $self->testPath() . "/repo/archive/db/out for ${strSegment}:" .
" ${strSegment}.error, ${strSegment}.ok");
#---------------------------------------------------------------------------------------------------------------------------
# Remove the ok file
storageTest()->remove("$self->{strSpoolPath}/${strSegment}.ok");
# Check status
$self->testException(
sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, ERROR_ASSERT, "${strSegment}.error has no content");
#---------------------------------------------------------------------------------------------------------------------------
# Generate a valid error
$oPushAsync->walStatusWrite(
WAL_STATUS_ERROR, $strSegment, ERROR_ARCHIVE_DUPLICATE, "WAL segment ${strSegment} already exists in the archive");
# Check status
$self->testException(sub {
$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, ERROR_ARCHIVE_DUPLICATE,
"WAL segment ${strSegment} already exists in the archive");
#---------------------------------------------------------------------------------------------------------------------------
# Change the error file to an ok file
storageTest()->move("$self->{strSpoolPath}/${strSegment}.error", "$self->{strSpoolPath}/${strSegment}.ok");
# Check status
$self->testResult(
sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, 1,
"${strSegment} WAL warning ok (converted from .error)");
#---------------------------------------------------------------------------------------------------------------------------
# Generate a normal ok
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
$oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment);
#---------------------------------------------------------------------------------------------------------------------------
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
# Check status
$self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 0, "${strSegment} WAL no status");
}
################################################################################################################################
@ -693,57 +632,27 @@ sub run
$self->optionTestSet(CFGOPT_SPOOL_PATH, $self->{strRepoPath});
$self->configTestLoad(CFGCMD_ARCHIVE_PUSH);
# Write an error file and verify that it doesn't error the first time around
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
storageTest()->pathCreate($self->{strSpoolPath}, {bCreateParent => true});
storageTest()->put("$self->{strSpoolPath}/${strSegment}.error", ERROR_ARCHIVE_TIMEOUT . "\ntest error");
$self->testException(
sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_ARCHIVE_TIMEOUT,
"test error");
$self->testResult($oPush->{bConfessOnError}, true, "went through error loop");
$self->testResult(
sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, '[undef]',
"${strSegment} WAL not in archive");
#---------------------------------------------------------------------------------------------------------------------------
# Write an OK file so the async process is not actually started
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
storageTest()->put("$self->{strSpoolPath}/${strSegment}.ok");
$self->testResult(
sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0,
"${strSegment} WAL pushed async from synthetic ok file");
$self->testResult(
sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, '[undef]',
"${strSegment} WAL not in archive");
#---------------------------------------------------------------------------------------------------------------------------
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
$self->walGenerate($self->{strWalPath}, PG_VERSION_94, 1, $strSegment);
$self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, "${strSegment} WAL pushed async");
exit if ($iProcessId != $PID);
$self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, "hit lock - already running");
# Wait for child process to exit
if ($iProcessId == $PID)
{
waitpid(-1, 0);
}
else
{
exit 0;
}
$self->testResult(
sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, "${strSegment}-$self->{strWalHash}",
sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment, 5)}, "${strSegment}-$self->{strWalHash}",
"${strSegment} WAL in archive");
$self->walRemove($self->{strWalPath}, $strSegment);
#---------------------------------------------------------------------------------------------------------------------------
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
$self->optionTestSet(CFGOPT_ARCHIVE_TIMEOUT, 1);
$self->configTestLoad(CFGCMD_ARCHIVE_PUSH);
$self->testException(
sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_ARCHIVE_TIMEOUT,
"unable to push WAL ${strSegment} asynchronously after 1 second(s)");
exit if ($iProcessId != $PID);
#---------------------------------------------------------------------------------------------------------------------------
$strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++);
$self->walGenerate($self->{strWalPath}, PG_VERSION_94, 1, $strSegment);
@ -753,10 +662,32 @@ sub run
$self->optionTestSet(CFGOPT_ARCHIVE_TIMEOUT, 5);
$self->configTestLoad(CFGCMD_ARCHIVE_PUSH);
$self->testException(
sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_FILE_READ,
"remote process on '" . BOGUS . "' terminated.*");
exit if ($iProcessId != $PID);
# Wait for error file to appear
my $oWait = waitInit(10);
my $strErrorFile = STORAGE_SPOOL_ARCHIVE_OUT . "/${strSegment}.error";
do
{
$self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, 'process connect error');
# Wait for child process to exit
if ($iProcessId == $PID)
{
waitpid(-1, 0);
}
else
{
exit 0;
}
}
while (!storageSpool()->exists($strErrorFile) && waitMore($oWait));
# Check contents of error file
my $strErrorFileContents = ${storageSpool()->get($strErrorFile)};
$self->testResult(
$strErrorFileContents =~ ("42\nremote process on '" . BOGUS . "' terminated.*"), true, "check error file contents");
# Disable async archiving
$self->optionTestClear(CFGOPT_BACKUP_HOST);

View File

@ -0,0 +1,169 @@
/***********************************************************************************************************************************
Test Archive Push Command
***********************************************************************************************************************************/
#include <stdlib.h>
#include "config/parse.h"
/***********************************************************************************************************************************
Test Run
***********************************************************************************************************************************/
void testRun()
{
// *****************************************************************************************************************************
if (testBegin("walStatus()"))
{
StringList *argList = strLstNew();
strLstAddZ(argList, "pgbackrest");
strLstAdd(argList, strNewFmt("--spool-path=%s", testPath()));
strLstAddZ(argList, "--archive-async");
strLstAddZ(argList, "--archive-timeout=1");
strLstAddZ(argList, "--stanza=db");
strLstAddZ(argList, "archive-push");
configParse(strLstSize(argList), strLstPtr(argList));
// -------------------------------------------------------------------------------------------------------------------------
String *segment = strNew("000000010000000100000001");
TEST_RESULT_BOOL(walStatus(segment, false), false, "directory and status file not present");
// -------------------------------------------------------------------------------------------------------------------------
mkdir(strPtr(strNewFmt("%s/archive", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db/out", testPath())), 0750);
TEST_RESULT_BOOL(walStatus(segment, false), false, "status file not present");
// -------------------------------------------------------------------------------------------------------------------------
storagePut(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR)));
TEST_ERROR(walStatus(segment, false), FormatError, "000000010000000100000001.ok content must have at least two lines");
storagePut(
storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR "\n")));
TEST_ERROR(walStatus(segment, false), FormatError, "000000010000000100000001.ok message must be > 0");
storagePut(
storageSpool(),
strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR "\nmessage")));
TEST_ERROR(walStatus(segment, false), FormatError, "unable to convert str 'BOGUS' to int");
storagePut(
storageSpool(),
strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew("0\nwarning")));
TEST_RESULT_BOOL(walStatus(segment, false), true, "ok file with warning");
testLogResult("P00 WARN: warning");
storagePut(
storageSpool(),
strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew("25\nerror")));
TEST_RESULT_BOOL(walStatus(segment, false), true, "error status renamed to ok");
testLogResult(
"P00 WARN: WAL segment '000000010000000100000001' was not pushed due to error [25] and was manually skipped: error");
// -------------------------------------------------------------------------------------------------------------------------
storagePut(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)), bufNewStr(strNew("")));
TEST_ERROR(
walStatus(segment, false), AssertError,
strPtr(
strNewFmt(
"multiple status files found in '%s/archive/db/out' for WAL segment '000000010000000100000001'", testPath())));
unlink(strPtr(storagePath(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)))));
TEST_ERROR(walStatus(segment, true), AssertError, "status file '000000010000000100000001.error' has no content");
storagePut(
storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)), bufNewStr(strNew("25\nmessage")));
TEST_ERROR(walStatus(segment, true), AssertError, "message");
TEST_RESULT_BOOL(walStatus(segment, false), false, "suppress error");
unlink(strPtr(storagePath(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)))));
}
// *****************************************************************************************************************************
if (testBegin("cmdArchivePush()"))
{
int processId = getpid();
StringList *argList = strLstNew();
strLstAddZ(argList, "pgbackrest");
strLstAddZ(argList, "--archive-timeout=1");
strLstAddZ(argList, "--stanza=db");
strLstAddZ(argList, "archive-push");
configParse(strLstSize(argList), strLstPtr(argList));
TEST_ERROR(cmdArchivePush(), ParamRequiredError, "WAL segment to push required");
// -------------------------------------------------------------------------------------------------------------------------
strLstAddZ(argList, "000000010000000100000001");
configParse(strLstSize(argList), strLstPtr(argList));
TEST_ERROR(cmdArchivePush(), AssertError, "archive-push in C does not support synchronous mode");
// Test that a bogus perl bin generates the correct errors
// -------------------------------------------------------------------------------------------------------------------------
String *perlBin = strNewFmt("%s/perl-test.sh", testPath());
strLstAdd(argList, strNewFmt("--perl-bin=%s", strPtr(perlBin)));
strLstAdd(argList, strNewFmt("--spool-path=%s", testPath()));
strLstAddZ(argList, "--archive-async");
configParse(strLstSize(argList), strLstPtr(argList));
TRY_BEGIN()
{
cmdArchivePush();
THROW(AssertError, "error should have been thrown"); // {uncoverable - test should not get here}
}
CATCH_ANY()
{
// Exit with error if this is the child process
if (getpid() != processId)
exit(errorCode());
// Check expected error on the parent process
TEST_RESULT_INT(errorCode(), errorTypeCode(&AssertError), "error code matches after failed Perl exec");
TEST_RESULT_STR(errorMessage(), "perl exited with error 25", "error message matches after failed Perl exec");
}
TRY_END();
// Write a blank script for the perl bin and make sure the process times out
// -------------------------------------------------------------------------------------------------------------------------
Storage *storage = storageNew(strNew(testPath()), 0750, 65536, NULL);
storagePut(storage, perlBin, bufNewStr(strNew("")));
TEST_ERROR(
cmdArchivePush(), ArchiveTimeoutError,
"unable to push WAL segment '000000010000000100000001' asynchronously after 1 second(s)");
// Write out a bogus .error file to make sure it is ignored on the first loop. The perl bin will write the real one when it
// executes.
// -------------------------------------------------------------------------------------------------------------------------
String *errorFile = storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT "/000000010000000100000001.error"));
mkdir(strPtr(strNewFmt("%s/archive", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db/out", testPath())), 0750);
storagePut(storageSpool(), errorFile, bufNewStr(strNew("")));
storagePut(storage, perlBin, bufNewStr(strNewFmt(
"set -e\n"
"echo '25' > %s\n"
"echo 'generic error message' >> %s\n",
strPtr(errorFile), strPtr(errorFile))));
TEST_ERROR(cmdArchivePush(), AssertError, "generic error message");
unlink(strPtr(errorFile));
// Modify script to write out a valid ok file
// -------------------------------------------------------------------------------------------------------------------------
storagePut(storage, perlBin, bufNewStr(strNewFmt(
"set -e\n"
"touch %s\n",
strPtr(storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT "/000000010000000100000001.ok"))))));
TEST_RESULT_VOID(cmdArchivePush(), "successful push");
testLogResult("P00 INFO: pushed WAL segment 000000010000000100000001 asynchronously");
}
}