1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-09-16 09:06:18 +02:00

The archive-get command is implemented entirely in C.

This new implementation should behave exactly like the old Perl code with the exception of a few updated log messages.

Remove as much of the Perl code as possible without breaking other commands.
This commit is contained in:
David Steele
2019-02-27 23:03:02 +02:00
parent 9367cc461c
commit db4b447be8
27 changed files with 560 additions and 905 deletions

View File

@@ -41,6 +41,10 @@
</release-bug-list>
<release-improvement-list>
<release-item>
<p>The <cmd>archive-get</cmd> command is implemented entirely in C.</p>
</release-item>
<release-item>
<release-item-contributor-list>
<release-item-ideator id="james.badger"/>
@@ -55,10 +59,6 @@
<p>Migrate <cmd>local</cmd> and <cmd>remote</cmd> commands to C.</p>
</release-item>
<release-item>
<p>Migrate remote <cmd>archive-get</cmd> command to C.</p>
</release-item>
<release-item>
<p>Add separate <cmd>archive-get-async</cmd> command.</p>
</release-item>

View File

@@ -2975,7 +2975,7 @@
</execute>
<execute user="postgres" output="y">
<exe-cmd>cat /var/log/pgbackrest/demo-archive-get-async.log</exe-cmd>
<exe-highlight>got WAL file [0-F]{24} from archive</exe-highlight>
<exe-highlight>found [0-F]{24} in the archive</exe-highlight>
</execute>
</execute-list>
</section>

View File

@@ -1,199 +0,0 @@
####################################################################################################################################
# ARCHIVE GET ASYNC MODULE
####################################################################################################################################
package pgBackRest::Archive::Get::Async;
use parent 'pgBackRest::Archive::Get::Get';
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use English '-no_match_vars';
use pgBackRest::Common::Exception;
use pgBackRest::Common::Lock;
use pgBackRest::Common::Log;
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Info;
use pgBackRest::Common::String;
use pgBackRest::Common::Wait;
use pgBackRest::Config::Config;
use pgBackRest::Db;
use pgBackRest::DbVersion;
use pgBackRest::LibC qw(:lock);
use pgBackRest::Protocol::Local::Process;
use pgBackRest::Protocol::Helper;
use pgBackRest::Storage::Helper;
use pgBackRest::Version;
####################################################################################################################################
# constructor
####################################################################################################################################
sub new
{
my $class = shift; # Class name
# Init object
my $self = $class->SUPER::new();
bless $self, $class;
# Assign function parameters, defaults, and log debug info
(
my $strOperation,
$self->{strSpoolPath},
$self->{strBackRestBin},
$self->{rstryWal},
) =
logDebugParam
(
__PACKAGE__ . '->new', \@_,
{name => 'strSpoolPath'},
{name => 'strBackRestBin', default => projectBin()},
{name => 'rstryWal'},
);
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'self', value => $self}
);
}
####################################################################################################################################
# Create the spool directory and initialize the archive process.
####################################################################################################################################
sub initServer
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->initServer');
# Initialize the archive process
$self->{oArchiveProcess} = new pgBackRest::Protocol::Local::Process(
CFGOPTVAL_LOCAL_TYPE_BACKUP, cfgOption(CFGOPT_PROTOCOL_TIMEOUT) < 60 ? cfgOption(CFGOPT_PROTOCOL_TIMEOUT) / 2 : 30,
$self->{strBackRestBin}, false);
$self->{oArchiveProcess}->hostAdd(1, cfgOption(CFGOPT_PROCESS_MAX));
# Return from function and log return values if any
return logDebugReturn($strOperation);
}
####################################################################################################################################
# Setup the server and process the queue. This function is separate from processQueue() for testing purposes.
####################################################################################################################################
sub process
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->process');
# Open the log file
logFileSet(storageLocal(), cfgOption(CFGOPT_LOG_PATH) . '/' . cfgOption(CFGOPT_STANZA) . '-archive-get-async');
# There is no loop here because it seems wise to let the async process exit periodically. As the queue grows each async
# execution will naturally run longer. This behavior is also far easier to test.
$self->initServer();
$self->processQueue();
# Return from function and log return values if any
return logDebugReturn($strOperation);
}
####################################################################################################################################
# Get WAL from archive
####################################################################################################################################
sub processQueue
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->processQueue');
# Queue the jobs
foreach my $strWalFile (@{$self->{rstryWal}})
{
$self->{oArchiveProcess}->queueJob(
1, 'default', $strWalFile, OP_ARCHIVE_GET_FILE, [$strWalFile, "$self->{strSpoolPath}/${strWalFile}", true]);
}
# Process jobs
my $iFoundTotal = 0;
my $iMissingTotal = 0;
my $iErrorTotal = 0;
&log(INFO,
'get ' . @{$self->{rstryWal}} . ' WAL file(s) from archive: ' .
${$self->{rstryWal}}[0] . (@{$self->{rstryWal}} > 1 ? "...${$self->{rstryWal}}[-1]" : ''));
eval
{
# Check for a stop lock
lockStopTest();
while (my $hyJob = $self->{oArchiveProcess}->process())
{
foreach my $hJob (@{$hyJob})
{
my $strWalFile = @{$hJob->{rParam}}[0];
my $iResult = @{$hJob->{rResult}}[0];
# If error then write out an error file
if (defined($hJob->{oException}))
{
archiveAsyncStatusWrite(
WAL_STATUS_ERROR, $self->{strSpoolPath}, $strWalFile, $hJob->{oException}->code(),
$hJob->{oException}->message());
$iErrorTotal++;
&log(WARN,
"could not get WAL file ${strWalFile} from archive (will be retried): [" .
$hJob->{oException}->code() . "] " . $hJob->{oException}->message());
}
# Else write a '.ok' file to indicate that the WAL was not found but there was no error
elsif ($iResult == 1)
{
archiveAsyncStatusWrite(WAL_STATUS_OK, $self->{strSpoolPath}, $strWalFile);
$iMissingTotal++;
&log(DETAIL, "WAL file ${strWalFile} not found in archive", undef, undef, undef, $hJob->{iProcessId});
}
# Else success so just output a log message
else
{
$iFoundTotal++;
&log(DETAIL, "got WAL file ${strWalFile} from archive", undef, undef, undef, $hJob->{iProcessId});
}
}
}
return 1;
}
or do
{
# Get error info
my $iCode = exceptionCode($EVAL_ERROR);
my $strMessage = exceptionMessage($EVAL_ERROR);
# Error all queued jobs
foreach my $strWalFile (@{$self->{rstryWal}})
{
archiveAsyncStatusWrite(WAL_STATUS_ERROR, $self->{strSpoolPath}, $strWalFile, $iCode, $strMessage);
}
};
return logDebugReturn
(
$strOperation,
{name => 'iNewTotal', value => scalar(@{$self->{rstryWal}})},
{name => 'iFoundTotal', value => $iFoundTotal},
{name => 'iMissingTotal', value => $iMissingTotal},
{name => 'iErrorTotal', value => $iErrorTotal}
);
}
1;

View File

@@ -128,75 +128,4 @@ sub archiveGetCheck
push @EXPORT, qw(archiveGetCheck);
####################################################################################################################################
# archiveGetFile
#
# Copy a file from the archive.
####################################################################################################################################
sub archiveGetFile
{
# Assign function parameters, defaults, and log debug info
my
(
$strOperation,
$strSourceArchive,
$strDestinationFile,
$bAtomic,
) =
logDebugParam
(
__PACKAGE__ . '::archiveGetFile', \@_,
{name => 'strSourceArchive'},
{name => 'strDestinationFile'},
{name => 'bAtomic'},
);
lockStopTest();
# Get the repo storage
my $oStorageRepo = storageRepo();
# Construct absolute path to the WAL file when it is relative
$strDestinationFile = walPath($strDestinationFile, cfgOption(CFGOPT_PG_PATH, false), cfgCommandName(cfgCommandGet()));
# Get the wal segment filename
my ($strArchiveId, $strArchiveFile, $strCipherPass) = archiveGetCheck(undef, undef, $strSourceArchive, false);
# If there are no matching archive files then there are two possibilities:
# 1) The end of the archive stream has been reached, this is normal and a 1 will be returned
# 2) There is a hole in the archive stream and a hard error should be returned. However, holes are possible due to async
# archiving - so when to report a hole? Since a hard error will cause PG to terminate, for now treat as case #1.
my $iResult = 0;
if (!defined($strArchiveFile))
{
$iResult = 1;
}
else
{
# Determine if the source file is already compressed
my $bSourceCompressed = $strArchiveFile =~ ('^.*\.' . COMPRESS_EXT . '$') ? true : false;
# Copy the archive file to the requested location
# If the file is encrypted, then the passphrase from the info file is required to open the archive file in the repo
$oStorageRepo->copy(
$oStorageRepo->openRead(
STORAGE_REPO_ARCHIVE . "/${strArchiveId}/${strArchiveFile}", {bProtocolCompress => !$bSourceCompressed,
strCipherPass => defined($strCipherPass) ? $strCipherPass : undef}),
storageDb()->openWrite(
$strDestinationFile,
{bAtomic => $bAtomic, rhyFilter => $bSourceCompressed ?
[{strClass => STORAGE_FILTER_GZIP, rxyParam => [{strCompressType => STORAGE_DECOMPRESS}]}] : undef}));
}
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'iResult', value => $iResult}
);
}
push @EXPORT, qw(archiveGetFile);
1;

View File

@@ -1,76 +0,0 @@
####################################################################################################################################
# ARCHIVE GET MODULE
####################################################################################################################################
package pgBackRest::Archive::Get::Get;
use parent 'pgBackRest::Archive::Base';
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use English '-no_match_vars';
use Exporter qw(import);
our @EXPORT = qw();
use Fcntl qw(SEEK_CUR O_RDONLY O_WRONLY O_CREAT);
use File::Basename qw(dirname basename);
use Scalar::Util qw(blessed);
use pgBackRest::Common::Exception;
use pgBackRest::Common::Lock;
use pgBackRest::Common::Log;
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Get::File;
use pgBackRest::Archive::Info;
use pgBackRest::Common::String;
use pgBackRest::Common::Wait;
use pgBackRest::Config::Config;
use pgBackRest::Db;
use pgBackRest::DbVersion;
use pgBackRest::Protocol::Helper;
use pgBackRest::Protocol::Storage::Helper;
use pgBackRest::Storage::Base;
use pgBackRest::Storage::Filter::Gzip;
use pgBackRest::Storage::Helper;
####################################################################################################################################
# process
####################################################################################################################################
sub process
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my
(
$strOperation,
$rstryCommandArg,
) =
logDebugParam
(
__PACKAGE__ . '->process', \@_,
{name => 'rstryCommandArg'},
);
my $iResult = 0;
# Make sure the command happens on the db side
if (!isDbLocal())
{
confess &log(ERROR, cfgCommandName(CFGCMD_ARCHIVE_GET) . ' operation must run on db host', ERROR_HOST_INVALID);
}
# Load module dynamically
require pgBackRest::Archive::Get::Async;
(new pgBackRest::Archive::Get::Async(
storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_IN), $self->{strBackRestBin}, $rstryCommandArg))->process();
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'iResult', value => $iResult, trace => true},
);
}
1;

View File

@@ -12,7 +12,6 @@ use Exporter qw(import);
use File::Basename;
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Get::Get;
use pgBackRest::Backup::Common;
use pgBackRest::Backup::File;
use pgBackRest::Backup::Info;

View File

@@ -12,7 +12,6 @@ use File::Basename qw(dirname);
use Scalar::Util qw(looks_like_number);
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Get::Get;
use pgBackRest::Archive::Info;
use pgBackRest::Common::Exception;
use pgBackRest::Common::Ini;

View File

@@ -80,17 +80,6 @@ sub main
new pgBackRest::Archive::Push::Push()->process($stryCommandArg[0]);
}
# Process archive-get-async command
# --------------------------------------------------------------------------------------------------------------------------
elsif (cfgCommandTest(CFGCMD_ARCHIVE_GET_ASYNC))
{
# Load module dynamically
require pgBackRest::Archive::Get::Get;
pgBackRest::Archive::Get::Get->import();
$iResult = new pgBackRest::Archive::Get::Get()->process(\@stryCommandArg);
}
# Process remote command
# --------------------------------------------------------------------------------------------------------------------------
elsif (cfgCommandTest(CFGCMD_REMOTE))

View File

@@ -29,8 +29,6 @@ use constant OP_ARCHIVE_PUSH_CHECK => 'archiveP
push @EXPORT, qw(OP_ARCHIVE_PUSH_CHECK);
# Archive File Module
use constant OP_ARCHIVE_GET_FILE => 'archiveGetFile';
push @EXPORT, qw(OP_ARCHIVE_GET_FILE);
use constant OP_ARCHIVE_PUSH_FILE => 'archivePushFile';
push @EXPORT, qw(OP_ARCHIVE_PUSH_FILE);

View File

@@ -8,7 +8,6 @@ use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use pgBackRest::Archive::Get::File;
use pgBackRest::Archive::Push::File;
use pgBackRest::Backup::File;
use pgBackRest::Common::Log;
@@ -55,7 +54,6 @@ sub init
# Create anonymous subs for each command
my $hCommandMap =
{
&OP_ARCHIVE_GET_FILE => sub {archiveGetFile(@{shift()})},
&OP_ARCHIVE_PUSH_FILE => sub {archivePushFile(@{shift()})},
&OP_BACKUP_FILE => sub {backupFile(@{shift()})},
&OP_RESTORE_FILE => sub {restoreFile(@{shift()})},

View File

@@ -60,6 +60,7 @@ SRCS = \
command/archive/common.c \
command/archive/get/file.c \
command/archive/get/get.c \
command/archive/get/protocol.c \
command/archive/push/push.c \
command/help/help.c \
command/info/info.c \
@@ -186,9 +187,12 @@ command/archive/common.o: command/archive/common.c command/archive/common.h comm
command/archive/get/file.o: command/archive/get/file.c command/archive/common.h command/archive/get/file.h command/control/control.h common/assert.h common/debug.h common/error.auto.h common/error.h common/ini.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h compress/gzip.h compress/gzipDecompress.h config/config.auto.h config/config.h config/define.auto.h config/define.h crypto/cipherBlock.h crypto/crypto.h info/infoArchive.h info/infoPg.h postgres/interface.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
$(CC) $(CFLAGS) -c command/archive/get/file.c -o command/archive/get/file.o
command/archive/get/get.o: command/archive/get/get.c command/archive/common.h command/archive/get/file.h command/command.h common/assert.h common/debug.h common/error.auto.h common/error.h common/fork.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/wait.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/exec.h crypto/crypto.h perl/exec.h postgres/interface.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
command/archive/get/get.o: command/archive/get/get.c command/archive/common.h command/archive/get/file.h command/archive/get/protocol.h command/command.h common/assert.h common/debug.h common/error.auto.h common/error.h common/fork.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/wait.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/exec.h crypto/crypto.h perl/exec.h postgres/interface.h protocol/client.h protocol/command.h protocol/helper.h protocol/parallel.h protocol/parallelJob.h protocol/server.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
$(CC) $(CFLAGS) -c command/archive/get/get.c -o command/archive/get/get.o
command/archive/get/protocol.o: command/archive/get/protocol.c command/archive/get/file.h command/archive/get/protocol.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h crypto/crypto.h protocol/server.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
$(CC) $(CFLAGS) -c command/archive/get/protocol.c -o command/archive/get/protocol.o
command/archive/push/push.o: command/archive/push/push.c command/archive/common.h command/command.h common/assert.h common/debug.h common/error.auto.h common/error.h common/fork.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h common/wait.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/load.h perl/exec.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
$(CC) $(CFLAGS) -c command/archive/push/push.c -o command/archive/push/push.o
@@ -204,7 +208,7 @@ command/help/help.o: command/help/help.c common/assert.h common/debug.h common/e
command/info/info.o: command/info/info.c command/archive/common.h command/info/info.h common/assert.h common/debug.h common/error.auto.h common/error.h common/ini.h common/io/filter/filter.h common/io/filter/group.h common/io/handleWrite.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/json.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h crypto/crypto.h crypto/hash.h info/info.h info/infoArchive.h info/infoBackup.h info/infoPg.h perl/exec.h postgres/interface.h storage/fileRead.h storage/fileWrite.h storage/helper.h storage/info.h storage/storage.h
$(CC) $(CFLAGS) -c command/info/info.c -o command/info/info.o
command/local/local.o: command/local/local.c common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/handleWrite.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/protocol.h protocol/client.h protocol/command.h protocol/helper.h protocol/server.h
command/local/local.o: command/local/local.c command/archive/get/protocol.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/handleWrite.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/protocol.h protocol/client.h protocol/command.h protocol/helper.h protocol/server.h
$(CC) $(CFLAGS) -c command/local/local.c -o command/local/local.o
command/remote/remote.o: command/remote/remote.c common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/handleWrite.h common/io/read.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h config/protocol.h protocol/client.h protocol/command.h protocol/helper.h protocol/server.h storage/driver/remote/protocol.h

View File

@@ -1,5 +1,5 @@
/***********************************************************************************************************************************
Archive Push Command
Archive Common
***********************************************************************************************************************************/
#include <stdint.h>
#include <stdlib.h>
@@ -23,6 +23,19 @@ STRING_EXTERN(WAL_SEGMENT_PARTIAL_REGEXP_STR, WAL_SEGMENT_
STRING_EXTERN(WAL_SEGMENT_DIR_REGEXP_STR, WAL_SEGMENT_DIR_REGEXP);
STRING_EXTERN(WAL_SEGMENT_FILE_REGEXP_STR, WAL_SEGMENT_FILE_REGEXP);
/***********************************************************************************************************************************
Get the correct spool queue based on the archive mode
***********************************************************************************************************************************/
static const String *
archiveAsyncSpoolQueue(ArchiveMode archiveMode)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(ENUM, archiveMode);
FUNCTION_TEST_END();
FUNCTION_TEST_RETURN((archiveMode == archiveModeGet ? STORAGE_SPOOL_ARCHIVE_IN_STR : STORAGE_SPOOL_ARCHIVE_OUT_STR));
}
/***********************************************************************************************************************************
Check for ok/error status files in the spool in/out directory
***********************************************************************************************************************************/
@@ -39,7 +52,7 @@ archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confe
MEM_CONTEXT_TEMP_BEGIN()
{
const String *spoolQueue = (archiveMode == archiveModeGet ? STORAGE_SPOOL_ARCHIVE_IN_STR : STORAGE_SPOOL_ARCHIVE_OUT_STR);
const String *spoolQueue = archiveAsyncSpoolQueue(archiveMode);
String *okFile = strNewFmt("%s.ok", strPtr(walSegment));
String *errorFile = strNewFmt("%s.error", strPtr(walSegment));
@@ -128,6 +141,70 @@ archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confe
FUNCTION_LOG_RETURN(BOOL, result);
}
/***********************************************************************************************************************************
Write an error status file
***********************************************************************************************************************************/
void
archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message, bool skipIfOk)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(ENUM, archiveMode);
FUNCTION_LOG_PARAM(STRING, walSegment);
FUNCTION_LOG_PARAM(INT, code);
FUNCTION_LOG_PARAM(STRING, message);
FUNCTION_LOG_PARAM(BOOL, skipIfOk);
FUNCTION_LOG_END();
ASSERT(walSegment != NULL);
ASSERT(code != 0);
ASSERT(message != NULL);
MEM_CONTEXT_TEMP_BEGIN()
{
// Only write the file if we are not worried about ok files or if the ok files does not exist
if (!skipIfOk ||
!(storageExistsNP(
storageSpool(), strNewFmt("%s/%s.ok", strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))) ||
(archiveMode == archiveModeGet && storageExistsNP(
storageSpool(), strNewFmt("%s/%s", strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))))))
{
storagePutNP(
storageNewWriteNP(
storageSpoolWrite(), strNewFmt("%s/%s.error", strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))),
bufNewStr(strNewFmt("%d\n%s", code, strPtr(message))));
}
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Write an ok status file
***********************************************************************************************************************************/
void
archiveAsyncStatusOkWrite(ArchiveMode archiveMode, const String *walSegment)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(ENUM, archiveMode);
FUNCTION_LOG_PARAM(STRING, walSegment);
FUNCTION_LOG_END();
ASSERT(walSegment != NULL);
MEM_CONTEXT_TEMP_BEGIN()
{
// Write file
storagePutNP(
storageNewWriteNP(
storageSpoolWrite(), strNewFmt("%s/%s.ok", strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))),
NULL);
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Is the segment partial?
***********************************************************************************************************************************/

View File

@@ -50,6 +50,9 @@ WAL segment constants
Functions
***********************************************************************************************************************************/
bool archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confessOnError);
void archiveAsyncStatusOkWrite(ArchiveMode archiveMode, const String *walSegment);
void archiveAsyncStatusErrorWrite(
ArchiveMode archiveMode, const String *walSegment, int code, const String *message, bool skipIfOk);
bool walIsPartial(const String *walSegment);
bool walIsSegment(const String *walSegment);

View File

@@ -112,11 +112,15 @@ archiveGetCheck(const String *archiveFile, CipherType cipherType, const String *
Copy a file from the archive to the specified destination
***********************************************************************************************************************************/
int
archiveGetFile(const String *archiveFile, const String *walDestination, CipherType cipherType, const String *cipherPass)
archiveGetFile(
const Storage *storage, const String *archiveFile, const String *walDestination, bool durable, CipherType cipherType,
const String *cipherPass)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(STORAGE, storage);
FUNCTION_LOG_PARAM(STRING, archiveFile);
FUNCTION_LOG_PARAM(STRING, walDestination);
FUNCTION_LOG_PARAM(BOOL, durable);
FUNCTION_LOG_PARAM(ENUM, cipherType);
// cipherPass omitted for security
FUNCTION_LOG_END();
@@ -138,8 +142,8 @@ archiveGetFile(const String *archiveFile, const String *walDestination, CipherTy
if (archiveGetCheckResult.archiveFileActual != NULL)
{
StorageFileWrite *destination = storageNewWriteP(
storageLocalWrite(), walDestination, .noCreatePath = true, .noSyncFile = true, .noSyncPath = true,
.noAtomic = true);
storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable,
.noAtomic = !durable);
// Add filters
IoFilterGroup *filterGroup = ioFilterGroupNew();

View File

@@ -6,10 +6,13 @@ Archive Get File
#include "common/type/string.h"
#include "crypto/crypto.h"
#include "storage/storage.h"
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
int archiveGetFile(const String *archiveFile, const String *walDestination, CipherType cipherType, const String *cipherPass);
int archiveGetFile(
const Storage *storage, const String *archiveFile, const String *walDestination, bool durable, CipherType cipherType,
const String *cipherPass);
#endif

View File

@@ -9,6 +9,7 @@ Archive Get Command
#include "command/archive/common.h"
#include "command/archive/get/file.h"
#include "command/archive/get/protocol.h"
#include "command/command.h"
#include "common/debug.h"
#include "common/fork.h"
@@ -20,7 +21,8 @@ Archive Get Command
#include "config/exec.h"
#include "perl/exec.h"
#include "postgres/interface.h"
#include "storage/helper.h"
#include "protocol/helper.h"
#include "protocol/parallel.h"
#include "storage/helper.h"
/***********************************************************************************************************************************
@@ -96,7 +98,7 @@ queueNeed(const String *walSegment, bool found, size_t queueSize, size_t walSegm
}
/***********************************************************************************************************************************
Push a WAL segment to the repository
Get an archive file from the repository (WAL segment, history file, etc.)
***********************************************************************************************************************************/
int
cmdArchiveGet(void)
@@ -262,7 +264,8 @@ cmdArchiveGet(void)
// Get the archive file
result = archiveGetFile(
walSegment, walDestination, cipherType(cfgOptionStr(cfgOptRepoCipherType)), cfgOptionStr(cfgOptRepoCipherPass));
storageLocalWrite(), walSegment, walDestination, false, cipherType(cfgOptionStr(cfgOptRepoCipherType)),
cfgOptionStr(cfgOptRepoCipherPass));
}
// Log whether or not the file was found
@@ -275,3 +278,102 @@ cmdArchiveGet(void)
FUNCTION_LOG_RETURN(INT, result);
}
/***********************************************************************************************************************************
Async version of archive get that runs in parallel for performance
***********************************************************************************************************************************/
void
cmdArchiveGetAsync(void)
{
FUNCTION_LOG_VOID(logLevelDebug);
MEM_CONTEXT_TEMP_BEGIN()
{
// Check the parameters
const StringList *walSegmentList = cfgCommandParam();
if (strLstSize(walSegmentList) < 1)
THROW(ParamInvalidError, "at least one wal segment is required");
TRY_BEGIN()
{
LOG_INFO(
"get %u WAL file(s) from archive: %s%s", strLstSize(walSegmentList), strPtr(strLstGet(walSegmentList, 0)),
strLstSize(walSegmentList) == 1 ?
"" : strPtr(strNewFmt("...%s", strPtr(strLstGet(walSegmentList, strLstSize(walSegmentList) - 1)))));
// Create the parallel executor
ProtocolParallel *parallelExec = protocolParallelNew((TimeMSec)(cfgOptionDbl(cfgOptProtocolTimeout) * MSEC_PER_SEC) / 2);
for (unsigned int processIdx = 1; processIdx <= (unsigned int)cfgOptionInt(cfgOptProcessMax); processIdx++)
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, processIdx));
// Queue jobs in executor
for (unsigned int walSegmentIdx = 0; walSegmentIdx < strLstSize(walSegmentList); walSegmentIdx++)
{
const String *walSegment = strLstGet(walSegmentList, walSegmentIdx);
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_STR);
protocolCommandParamAdd(command, varNewStr(walSegment));
protocolParallelJobAdd(parallelExec, protocolParallelJobNew(varNewStr(walSegment), command));
}
// Process jobs
do
{
unsigned int completed = protocolParallelProcess(parallelExec);
for (unsigned int jobIdx = 0; jobIdx < completed; jobIdx++)
{
// Get the job and job key
ProtocolParallelJob *job = protocolParallelResult(parallelExec);
const String *walSegment = varStr(protocolParallelJobKey(job));
// The job was successful
if (protocolParallelJobErrorCode(job) == 0)
{
// Get the archive file
if (varIntForce(protocolParallelJobResult(job)) == 0)
{
LOG_DETAIL("found %s in the archive", strPtr(walSegment));
}
// If it does not exist write an ok file to indicate that it was checked
else
{
LOG_DETAIL("unable to find %s in the archive", strPtr(walSegment));
archiveAsyncStatusOkWrite(archiveModeGet, walSegment);
}
}
// Else the job errored
else
{
LOG_WARN(
"could not get %s from the archive (will be retried): [%d] %s", strPtr(walSegment),
protocolParallelJobErrorCode(job), strPtr(protocolParallelJobErrorMessage(job)));
archiveAsyncStatusErrorWrite(
archiveModeGet, walSegment, protocolParallelJobErrorCode(job), protocolParallelJobErrorMessage(job),
false);
}
}
}
while (!protocolParallelDone(parallelExec));
}
CATCH_ANY()
{
// On any global error write the same error into every .error file unless the get was already successful
for (unsigned int walSegmentIdx = 0; walSegmentIdx < strLstSize(walSegmentList); walSegmentIdx++)
{
archiveAsyncStatusErrorWrite(
archiveModeGet, strLstGet(walSegmentList, walSegmentIdx), errorCode(), strNew(errorMessage()), true);
}
RETHROW();
}
TRY_END();
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN_VOID();
}

View File

@@ -8,5 +8,6 @@ Archive Get Command
Functions
***********************************************************************************************************************************/
int cmdArchiveGet(void);
void cmdArchiveGetAsync(void);
#endif

View File

@@ -0,0 +1,57 @@
/***********************************************************************************************************************************
Archive Get Protocol Handler
***********************************************************************************************************************************/
#include "command/archive/get/protocol.h"
#include "command/archive/get/file.h"
#include "common/debug.h"
#include "common/io/io.h"
#include "common/log.h"
#include "common/memContext.h"
#include "config/config.h"
#include "storage/helper.h"
/***********************************************************************************************************************************
Constants
***********************************************************************************************************************************/
STRING_EXTERN(PROTOCOL_COMMAND_ARCHIVE_GET_STR, PROTOCOL_COMMAND_ARCHIVE_GET);
/***********************************************************************************************************************************
Process protocol requests
***********************************************************************************************************************************/
bool
archiveGetProtocol(const String *command, const VariantList *paramList, ProtocolServer *server)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STRING, command);
FUNCTION_LOG_PARAM(VARIANT_LIST, paramList);
FUNCTION_LOG_PARAM(PROTOCOL_SERVER, server);
FUNCTION_LOG_END();
ASSERT(command != NULL);
// Get the repo storage in case it is remote and encryption settings need to be pulled down
storageRepo();
// Attempt to satisfy the request -- we may get requests that are meant for other handlers
bool found = true;
MEM_CONTEXT_TEMP_BEGIN()
{
if (strEq(command, PROTOCOL_COMMAND_ARCHIVE_GET_STR))
{
const String *walSegment = varStr(varLstGet(paramList, 0));
protocolServerResponse(
server,
varNewInt(
archiveGetFile(
storageSpoolWrite(), walSegment, strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strPtr(walSegment)), true,
cipherType(cfgOptionStr(cfgOptRepoCipherType)), cfgOptionStr(cfgOptRepoCipherPass))));
}
else
found = false;
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(BOOL, found);
}

View File

@@ -0,0 +1,22 @@
/***********************************************************************************************************************************
Archive Get Protocol Handler
***********************************************************************************************************************************/
#ifndef COMMAND_ARCHIVE_GET_PROTOCOL_H
#define COMMAND_ARCHIVE_GET_PROTOCOL_H
#include "common/type/string.h"
#include "common/type/variantList.h"
#include "protocol/server.h"
/***********************************************************************************************************************************
Constants
***********************************************************************************************************************************/
#define PROTOCOL_COMMAND_ARCHIVE_GET "archiveGet"
STRING_DECLARE(PROTOCOL_COMMAND_ARCHIVE_GET_STR);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
bool archiveGetProtocol(const String *command, const VariantList *paramList, ProtocolServer *server);
#endif

View File

@@ -1,6 +1,7 @@
/***********************************************************************************************************************************
Local Command
***********************************************************************************************************************************/
#include "command/archive/get/protocol.h"
#include "common/debug.h"
#include "common/io/handleRead.h"
#include "common/io/handleWrite.h"
@@ -27,6 +28,7 @@ cmdLocal(int handleRead, int handleWrite)
ioWriteOpen(write);
ProtocolServer *server = protocolServerNew(name, PROTOCOL_SERVICE_LOCAL_STR, read, write);
protocolServerHandlerAdd(server, archiveGetProtocol);
protocolServerProcess(server);
}
MEM_CONTEXT_TEMP_END();

View File

@@ -65,7 +65,7 @@ main(int argListSize, const char *argList[])
// Local command. Currently only implements a subset.
// -------------------------------------------------------------------------------------------------------------------------
else if (cfgCommand() == cfgCmdLocal && strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdArchiveGet)))
else if (cfgCommand() == cfgCmdLocal && strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdArchiveGetAsync)))
{
cmdLocal(STDIN_FILENO, STDOUT_FILENO);
}
@@ -74,7 +74,9 @@ main(int argListSize, const char *argList[])
// -------------------------------------------------------------------------------------------------------------------------
else if (cfgCommand() == cfgCmdRemote &&
(strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdArchiveGet)) ||
strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdInfo))))
strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdArchiveGetAsync)) ||
strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdInfo)) ||
strEqZ(cfgOptionStr(cfgOptCommand), cfgCommandName(cfgCmdLocal))))
{
cmdRemote(STDIN_FILENO, STDOUT_FILENO);
}
@@ -86,6 +88,13 @@ main(int argListSize, const char *argList[])
result = cmdArchiveGet();
}
// Archive get async command
// -------------------------------------------------------------------------------------------------------------------------
else if (cfgCommand() == cfgCmdArchiveGetAsync)
{
cmdArchiveGetAsync();
}
// Archive push command. Currently only implements local operations of async archive push.
// -------------------------------------------------------------------------------------------------------------------------
else if (cfgCommand() == cfgCmdArchivePush && cfgOptionBool(cfgOptArchiveAsync))

View File

@@ -480,179 +480,6 @@ static const EmbeddedModule embeddedModule[] =
"\n"
"1;\n"
},
{
.name = "pgBackRest/Archive/Get/Async.pm",
.data =
"\n\n\n"
"package pgBackRest::Archive::Get::Async;\n"
"use parent 'pgBackRest::Archive::Get::Get';\n"
"\n"
"use strict;\n"
"use warnings FATAL => qw(all);\n"
"use Carp qw(confess);\n"
"use English '-no_match_vars';\n"
"\n"
"use pgBackRest::Common::Exception;\n"
"use pgBackRest::Common::Lock;\n"
"use pgBackRest::Common::Log;\n"
"use pgBackRest::Archive::Common;\n"
"use pgBackRest::Archive::Info;\n"
"use pgBackRest::Common::String;\n"
"use pgBackRest::Common::Wait;\n"
"use pgBackRest::Config::Config;\n"
"use pgBackRest::Db;\n"
"use pgBackRest::DbVersion;\n"
"use pgBackRest::LibC qw(:lock);\n"
"use pgBackRest::Protocol::Local::Process;\n"
"use pgBackRest::Protocol::Helper;\n"
"use pgBackRest::Storage::Helper;\n"
"use pgBackRest::Version;\n"
"\n\n\n\n"
"sub new\n"
"{\n"
"my $class = shift;\n"
"\n\n"
"my $self = $class->SUPER::new();\n"
"bless $self, $class;\n"
"\n\n"
"(\n"
"my $strOperation,\n"
"$self->{strSpoolPath},\n"
"$self->{strBackRestBin},\n"
"$self->{rstryWal},\n"
") =\n"
"logDebugParam\n"
"(\n"
"__PACKAGE__ . '->new', \\@_,\n"
"{name => 'strSpoolPath'},\n"
"{name => 'strBackRestBin', default => projectBin()},\n"
"{name => 'rstryWal'},\n"
");\n"
"\n\n"
"return logDebugReturn\n"
"(\n"
"$strOperation,\n"
"{name => 'self', value => $self}\n"
");\n"
"}\n"
"\n\n\n\n"
"sub initServer\n"
"{\n"
"my $self = shift;\n"
"\n\n"
"my ($strOperation) = logDebugParam(__PACKAGE__ . '->initServer');\n"
"\n\n"
"$self->{oArchiveProcess} = new pgBackRest::Protocol::Local::Process(\n"
"CFGOPTVAL_LOCAL_TYPE_BACKUP, cfgOption(CFGOPT_PROTOCOL_TIMEOUT) < 60 ? cfgOption(CFGOPT_PROTOCOL_TIMEOUT) / 2 : 30,\n"
"$self->{strBackRestBin}, false);\n"
"$self->{oArchiveProcess}->hostAdd(1, cfgOption(CFGOPT_PROCESS_MAX));\n"
"\n\n"
"return logDebugReturn($strOperation);\n"
"}\n"
"\n\n\n\n"
"sub process\n"
"{\n"
"my $self = shift;\n"
"\n\n"
"my ($strOperation) = logDebugParam(__PACKAGE__ . '->process');\n"
"\n\n"
"logFileSet(storageLocal(), cfgOption(CFGOPT_LOG_PATH) . '/' . cfgOption(CFGOPT_STANZA) . '-archive-get-async');\n"
"\n\n\n"
"$self->initServer();\n"
"$self->processQueue();\n"
"\n\n"
"return logDebugReturn($strOperation);\n"
"}\n"
"\n\n\n\n"
"sub processQueue\n"
"{\n"
"my $self = shift;\n"
"\n\n"
"my ($strOperation) = logDebugParam(__PACKAGE__ . '->processQueue');\n"
"\n\n"
"foreach my $strWalFile (@{$self->{rstryWal}})\n"
"{\n"
"$self->{oArchiveProcess}->queueJob(\n"
"1, 'default', $strWalFile, OP_ARCHIVE_GET_FILE, [$strWalFile, \"$self->{strSpoolPath}/${strWalFile}\", true]);\n"
"}\n"
"\n\n"
"my $iFoundTotal = 0;\n"
"my $iMissingTotal = 0;\n"
"my $iErrorTotal = 0;\n"
"\n"
"&log(INFO,\n"
"'get ' . @{$self->{rstryWal}} . ' WAL file(s) from archive: ' .\n"
"${$self->{rstryWal}}[0] . (@{$self->{rstryWal}} > 1 ? \"...${$self->{rstryWal}}[-1]\" : ''));\n"
"\n"
"eval\n"
"{\n"
"\n"
"lockStopTest();\n"
"\n"
"while (my $hyJob = $self->{oArchiveProcess}->process())\n"
"{\n"
"foreach my $hJob (@{$hyJob})\n"
"{\n"
"my $strWalFile = @{$hJob->{rParam}}[0];\n"
"my $iResult = @{$hJob->{rResult}}[0];\n"
"\n\n"
"if (defined($hJob->{oException}))\n"
"{\n"
"archiveAsyncStatusWrite(\n"
"WAL_STATUS_ERROR, $self->{strSpoolPath}, $strWalFile, $hJob->{oException}->code(),\n"
"$hJob->{oException}->message());\n"
"\n"
"$iErrorTotal++;\n"
"\n"
"&log(WARN,\n"
"\"could not get WAL file ${strWalFile} from archive (will be retried): [\" .\n"
"$hJob->{oException}->code() . \"] \" . $hJob->{oException}->message());\n"
"}\n"
"\n"
"elsif ($iResult == 1)\n"
"{\n"
"archiveAsyncStatusWrite(WAL_STATUS_OK, $self->{strSpoolPath}, $strWalFile);\n"
"\n"
"$iMissingTotal++;\n"
"\n"
"&log(DETAIL, \"WAL file ${strWalFile} not found in archive\", undef, undef, undef, $hJob->{iProcessId});\n"
"}\n"
"\n"
"else\n"
"{\n"
"$iFoundTotal++;\n"
"\n"
"&log(DETAIL, \"got WAL file ${strWalFile} from archive\", undef, undef, undef, $hJob->{iProcessId});\n"
"}\n"
"}\n"
"}\n"
"\n"
"return 1;\n"
"}\n"
"or do\n"
"{\n"
"\n"
"my $iCode = exceptionCode($EVAL_ERROR);\n"
"my $strMessage = exceptionMessage($EVAL_ERROR);\n"
"\n\n"
"foreach my $strWalFile (@{$self->{rstryWal}})\n"
"{\n"
"archiveAsyncStatusWrite(WAL_STATUS_ERROR, $self->{strSpoolPath}, $strWalFile, $iCode, $strMessage);\n"
"}\n"
"};\n"
"\n"
"return logDebugReturn\n"
"(\n"
"$strOperation,\n"
"{name => 'iNewTotal', value => scalar(@{$self->{rstryWal}})},\n"
"{name => 'iFoundTotal', value => $iFoundTotal},\n"
"{name => 'iMissingTotal', value => $iMissingTotal},\n"
"{name => 'iErrorTotal', value => $iErrorTotal}\n"
");\n"
"}\n"
"\n"
"1;\n"
},
{
.name = "pgBackRest/Archive/Get/File.pm",
.data =
@@ -770,132 +597,6 @@ static const EmbeddedModule embeddedModule[] =
"}\n"
"\n"
"push @EXPORT, qw(archiveGetCheck);\n"
"\n\n\n\n\n\n"
"sub archiveGetFile\n"
"{\n"
"\n"
"my\n"
"(\n"
"$strOperation,\n"
"$strSourceArchive,\n"
"$strDestinationFile,\n"
"$bAtomic,\n"
") =\n"
"logDebugParam\n"
"(\n"
"__PACKAGE__ . '::archiveGetFile', \\@_,\n"
"{name => 'strSourceArchive'},\n"
"{name => 'strDestinationFile'},\n"
"{name => 'bAtomic'},\n"
");\n"
"\n"
"lockStopTest();\n"
"\n\n"
"my $oStorageRepo = storageRepo();\n"
"\n\n"
"$strDestinationFile = walPath($strDestinationFile, cfgOption(CFGOPT_PG_PATH, false), cfgCommandName(cfgCommandGet()));\n"
"\n\n"
"my ($strArchiveId, $strArchiveFile, $strCipherPass) = archiveGetCheck(undef, undef, $strSourceArchive, false);\n"
"\n\n\n\n\n"
"my $iResult = 0;\n"
"\n"
"if (!defined($strArchiveFile))\n"
"{\n"
"$iResult = 1;\n"
"}\n"
"else\n"
"{\n"
"\n"
"my $bSourceCompressed = $strArchiveFile =~ ('^.*\\.' . COMPRESS_EXT . '$') ? true : false;\n"
"\n\n\n"
"$oStorageRepo->copy(\n"
"$oStorageRepo->openRead(\n"
"STORAGE_REPO_ARCHIVE . \"/${strArchiveId}/${strArchiveFile}\", {bProtocolCompress => !$bSourceCompressed,\n"
"strCipherPass => defined($strCipherPass) ? $strCipherPass : undef}),\n"
"storageDb()->openWrite(\n"
"$strDestinationFile,\n"
"{bAtomic => $bAtomic, rhyFilter => $bSourceCompressed ?\n"
"[{strClass => STORAGE_FILTER_GZIP, rxyParam => [{strCompressType => STORAGE_DECOMPRESS}]}] : undef}));\n"
"}\n"
"\n\n"
"return logDebugReturn\n"
"(\n"
"$strOperation,\n"
"{name => 'iResult', value => $iResult}\n"
");\n"
"}\n"
"\n"
"push @EXPORT, qw(archiveGetFile);\n"
"\n"
"1;\n"
},
{
.name = "pgBackRest/Archive/Get/Get.pm",
.data =
"\n\n\n"
"package pgBackRest::Archive::Get::Get;\n"
"use parent 'pgBackRest::Archive::Base';\n"
"\n"
"use strict;\n"
"use warnings FATAL => qw(all);\n"
"use Carp qw(confess);\n"
"use English '-no_match_vars';\n"
"\n"
"use Exporter qw(import);\n"
"our @EXPORT = qw();\n"
"use Fcntl qw(SEEK_CUR O_RDONLY O_WRONLY O_CREAT);\n"
"use File::Basename qw(dirname basename);\n"
"use Scalar::Util qw(blessed);\n"
"\n"
"use pgBackRest::Common::Exception;\n"
"use pgBackRest::Common::Lock;\n"
"use pgBackRest::Common::Log;\n"
"use pgBackRest::Archive::Common;\n"
"use pgBackRest::Archive::Get::File;\n"
"use pgBackRest::Archive::Info;\n"
"use pgBackRest::Common::String;\n"
"use pgBackRest::Common::Wait;\n"
"use pgBackRest::Config::Config;\n"
"use pgBackRest::Db;\n"
"use pgBackRest::DbVersion;\n"
"use pgBackRest::Protocol::Helper;\n"
"use pgBackRest::Protocol::Storage::Helper;\n"
"use pgBackRest::Storage::Base;\n"
"use pgBackRest::Storage::Filter::Gzip;\n"
"use pgBackRest::Storage::Helper;\n"
"\n\n\n\n"
"sub process\n"
"{\n"
"my $self = shift;\n"
"\n\n"
"my\n"
"(\n"
"$strOperation,\n"
"$rstryCommandArg,\n"
") =\n"
"logDebugParam\n"
"(\n"
"__PACKAGE__ . '->process', \\@_,\n"
"{name => 'rstryCommandArg'},\n"
");\n"
"\n"
"my $iResult = 0;\n"
"\n\n"
"if (!isDbLocal())\n"
"{\n"
"confess &log(ERROR, cfgCommandName(CFGCMD_ARCHIVE_GET) . ' operation must run on db host', ERROR_HOST_INVALID);\n"
"}\n"
"\n\n\n"
"require pgBackRest::Archive::Get::Async;\n"
"(new pgBackRest::Archive::Get::Async(\n"
"storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_IN), $self->{strBackRestBin}, $rstryCommandArg))->process();\n"
"\n\n"
"return logDebugReturn\n"
"(\n"
"$strOperation,\n"
"{name => 'iResult', value => $iResult, trace => true},\n"
");\n"
"}\n"
"\n"
"1;\n"
},
@@ -1987,7 +1688,6 @@ static const EmbeddedModule embeddedModule[] =
"use File::Basename;\n"
"\n"
"use pgBackRest::Archive::Common;\n"
"use pgBackRest::Archive::Get::Get;\n"
"use pgBackRest::Backup::Common;\n"
"use pgBackRest::Backup::File;\n"
"use pgBackRest::Backup::Info;\n"
@@ -9912,7 +9612,6 @@ static const EmbeddedModule embeddedModule[] =
"use Scalar::Util qw(looks_like_number);\n"
"\n"
"use pgBackRest::Archive::Common;\n"
"use pgBackRest::Archive::Get::Get;\n"
"use pgBackRest::Archive::Info;\n"
"use pgBackRest::Common::Exception;\n"
"use pgBackRest::Common::Ini;\n"
@@ -10812,15 +10511,6 @@ static const EmbeddedModule embeddedModule[] =
"new pgBackRest::Archive::Push::Push()->process($stryCommandArg[0]);\n"
"}\n"
"\n\n\n"
"elsif (cfgCommandTest(CFGCMD_ARCHIVE_GET_ASYNC))\n"
"{\n"
"\n"
"require pgBackRest::Archive::Get::Get;\n"
"pgBackRest::Archive::Get::Get->import();\n"
"\n"
"$iResult = new pgBackRest::Archive::Get::Get()->process(\\@stryCommandArg);\n"
"}\n"
"\n\n\n"
"elsif (cfgCommandTest(CFGCMD_REMOTE))\n"
"{\n"
"\n"
@@ -12975,8 +12665,6 @@ static const EmbeddedModule embeddedModule[] =
"use constant OP_ARCHIVE_PUSH_CHECK => 'archivePushCheck';\n"
"push @EXPORT, qw(OP_ARCHIVE_PUSH_CHECK);\n"
"\n\n"
"use constant OP_ARCHIVE_GET_FILE => 'archiveGetFile';\n"
"push @EXPORT, qw(OP_ARCHIVE_GET_FILE);\n"
"use constant OP_ARCHIVE_PUSH_FILE => 'archivePushFile';\n"
"push @EXPORT, qw(OP_ARCHIVE_PUSH_FILE);\n"
"\n\n"
@@ -13444,7 +13132,6 @@ static const EmbeddedModule embeddedModule[] =
"use warnings FATAL => qw(all);\n"
"use Carp qw(confess);\n"
"\n"
"use pgBackRest::Archive::Get::File;\n"
"use pgBackRest::Archive::Push::File;\n"
"use pgBackRest::Backup::File;\n"
"use pgBackRest::Common::Log;\n"
@@ -13480,7 +13167,6 @@ static const EmbeddedModule embeddedModule[] =
"\n\n"
"my $hCommandMap =\n"
"{\n"
"&OP_ARCHIVE_GET_FILE => sub {archiveGetFile(@{shift()})},\n"
"&OP_ARCHIVE_PUSH_FILE => sub {archivePushFile(@{shift()})},\n"
"&OP_BACKUP_FILE => sub {backupFile(@{shift()})},\n"
"&OP_RESTORE_FILE => sub {restoreFile(@{shift()})},\n"

View File

@@ -642,7 +642,7 @@ unit:
test:
# ----------------------------------------------------------------------------------------------------------------------------
- name: archive-common
total: 6
total: 7
coverage:
command/archive/common: full
@@ -656,34 +656,20 @@ unit:
# ----------------------------------------------------------------------------------------------------------------------------
- name: archive-get
total: 4
total: 5
perlReq: true
coverage:
command/archive/get/file: full
command/archive/get/get: full
# This test is flapping on co6 which seems to be due to some race condition. The tests are under active development, so for
# now we are disabling the co6 tests in the hopes that the cause will be caught later. It seems fairly certain that this is
# some sort of issue with the test and not the underlying code. No flakiness has been seen in the integration tests and
# this issue has never happened on another vm.
vm:
- co7
- u12
- u14
- u16
- u18
- d8
command/archive/get/protocol: full
# ----------------------------------------------------------------------------------------------------------------------------
- name: archive-get-perl
total: 3
total: 1
coverage:
Archive/Base: full
Archive/Get/Async: full
Archive/Get/File: partial
Archive/Get/Get: partial
# ----------------------------------------------------------------------------------------------------------------------------
- name: archive-push
@@ -698,6 +684,7 @@ unit:
total: 7
coverage:
Archive/Base: full
Archive/Push/Async: full
Archive/Push/File: partial
Archive/Push/Push: full

View File

@@ -14,9 +14,7 @@ use Carp qw(confess);
use Storable qw(dclone);
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Get::Async;
use pgBackRest::Archive::Get::File;
use pgBackRest::Archive::Get::Get;
use pgBackRest::Archive::Info;
use pgBackRest::Common::Exception;
use pgBackRest::Common::Log;
@@ -191,190 +189,6 @@ sub run
substr($strWalSegment, 0, 16) . "/" . $strArchiveFile)})}, $strFileHash,
'check correct WAL archiveID when in multiple locations');
}
################################################################################################################################
if ($self->begin("Archive::Get::Get::get() sync"))
{
# archive.info missing
#---------------------------------------------------------------------------------------------------------------------------
$self->testException(sub {archiveGetFile($strWalSegment, $strDestinationFile, false)},
ERROR_FILE_MISSING,
ARCHIVE_INFO_FILE . " does not exist but is required to push/get WAL segments\n" .
"HINT: is archive_command configured in postgresql.conf?\n" .
"HINT: has a stanza-create been performed?\n" .
"HINT: use --no-archive-check to disable archive checks during backup if you have an alternate archiving scheme.");
# Create and save archive.info file
my $oArchiveInfo = new pgBackRest::Archive::Info(storageRepo()->pathGet(STORAGE_REPO_ARCHIVE), false,
{bLoad => false, bIgnoreMissing => true});
$oArchiveInfo->create(PG_VERSION_94, $self->dbSysId(PG_VERSION_94), false);
$oArchiveInfo->dbSectionSet(PG_VERSION_93, $self->dbSysId(PG_VERSION_93), $oArchiveInfo->dbHistoryIdGet(false) + 1);
$oArchiveInfo->dbSectionSet(PG_VERSION_94, $self->dbSysId(PG_VERSION_94), $oArchiveInfo->dbHistoryIdGet(false) + 10);
$oArchiveInfo->save();
# file not found
#---------------------------------------------------------------------------------------------------------------------------
$self->testResult(sub {archiveGetFile($strWalSegment, $strDestinationFile, false)}, 1,
"unable to find ${strWalSegment} in the archive");
# file found but is not a WAL segment
#---------------------------------------------------------------------------------------------------------------------------
$strArchivePath = $self->{strArchivePath} . "/" . PG_VERSION_94 . "-1/";
storageRepo()->pathCreate($strArchivePath);
storageRepo()->put($strArchivePath . BOGUS, BOGUS);
my $strBogusHash = cryptoHashOne('sha1', BOGUS);
# Create path to copy file
storageRepo()->pathCreate($strDestinationPath);
$self->testResult(sub {archiveGetFile(BOGUS, $strDestinationFile, false)}, 0,
"non-WAL segment copied");
# Confirm the correct file is copied
$self->testResult(sub {cryptoHashOne('sha1', ${storageRepo()->get($strDestinationFile)})}, $strBogusHash,
' check correct non-WAL copied from older archiveId');
# create same WAL segment in same DB but different archives and different hash values. Confirm latest one copied.
#---------------------------------------------------------------------------------------------------------------------------
my $strWalMajorPath = "${strArchivePath}/" . substr($strWalSegment, 0, 16);
my $strWalSegmentName = "${strWalSegment}-${strFileHash}";
# Put zero byte file in old archive
storageRepo()->pathCreate($strWalMajorPath);
storageRepo()->put("${strWalMajorPath}/${strWalSegmentName}");
# Create newest archive path
$strArchivePath = $self->{strArchivePath} . "/" . PG_VERSION_94 . "-12/";
$strWalMajorPath = "${strArchivePath}/" . substr($strWalSegment, 0, 16);
$strWalSegmentName = "${strWalSegment}-${strFileHash}";
# Store with actual data that will match the hash check
storageRepo()->pathCreate($strWalMajorPath, {bCreateParent => true});
storageRepo()->put("${strWalMajorPath}/${strWalSegmentName}", $strFileContent);
$self->testResult(sub {archiveGetFile($strWalSegmentName, $strDestinationFile, false)}, 0,
"WAL segment copied");
# Confirm the correct file is copied
$self->testResult(sub {cryptoHashOne('sha1', ${storageRepo()->get($strDestinationFile)})}, $strFileHash,
' check correct WAL copied when in multiple locations');
# Get files from an older DB version to simulate restoring from an old backup set to a database that is of that same version
#---------------------------------------------------------------------------------------------------------------------------
# Create same WAL name in older DB archive but with different data to ensure it is copied
$strArchivePath = $self->{strArchivePath} . "/" . PG_VERSION_93 . "-2/";
$strWalMajorPath = "${strArchivePath}/" . substr($strWalSegment, 0, 16);
$strWalSegmentName = "${strWalSegment}-${strFileHash}";
my $strWalContent = 'WALTESTDATA';
my $strWalHash = cryptoHashOne('sha1', $strWalContent);
# Store with actual data that will match the hash check
storageRepo()->pathCreate($strWalMajorPath, {bCreateParent => true});
storageRepo()->put("${strWalMajorPath}/${strWalSegmentName}", $strWalContent);
# Remove the destination file to ensure it is copied
storageTest()->remove($strDestinationFile);
# Overwrite current pg_control file with older version
$self->controlGenerate($self->{strDbPath}, PG_VERSION_93);
$self->testResult(sub {archiveGetFile($strWalSegmentName, $strDestinationFile, false)}, 0,
"WAL segment copied from older db backupset to same version older db");
# Confirm the correct file is copied
$self->testResult(sub {cryptoHashOne('sha1', ${storageRepo()->get($strDestinationFile)})}, $strWalHash,
' check correct WAL copied from older db');
}
################################################################################################################################
if ($self->begin("Archive::Get::Get::get() async"))
{
# Test error in local process when stanza has not been created
#---------------------------------------------------------------------------------------------------------------------------
my @stryWal = ('000000010000000A0000000A', '000000010000000A0000000B');
my $oGetAsync = new pgBackRest::Archive::Get::Async(
$self->{strSpoolPath}, $self->backrestExe(), \@stryWal);
$self->optionTestSet(CFGOPT_SPOOL_PATH, $self->{strRepoPath});
$self->configTestLoad(CFGCMD_ARCHIVE_GET_ASYNC);
$oGetAsync->process();
my $strErrorMessage =
"55\n" .
"raised from local-1 process: archive.info does not exist but is required to push/get WAL segments\n" .
"HINT: is archive_command configured in postgresql.conf?\n" .
"HINT: has a stanza-create been performed?\n" .
"HINT: use --no-archive-check to disable archive checks during backup if you have an alternate archiving scheme.";
$self->testResult(
sub {storageSpool()->list(STORAGE_SPOOL_ARCHIVE_IN)},
"(000000010000000A0000000A.error, 000000010000000A0000000B.error)", 'error files created');
$self->testResult(
${storageSpool()->get(STORAGE_SPOOL_ARCHIVE_IN . "/000000010000000A0000000A.error")}, $strErrorMessage,
"check error file contents");
storageSpool()->remove(STORAGE_SPOOL_ARCHIVE_IN . "/000000010000000A0000000A.error");
$self->testResult(
${storageSpool()->get(STORAGE_SPOOL_ARCHIVE_IN . "/000000010000000A0000000B.error")}, $strErrorMessage,
"check error file contents");
storageSpool()->remove(STORAGE_SPOOL_ARCHIVE_IN . "/000000010000000A0000000B.error");
# Create archive info file
#---------------------------------------------------------------------------------------------------------------------------
my $oArchiveInfo = new pgBackRest::Archive::Info($self->{strArchivePath}, false, {bIgnoreMissing => true});
$oArchiveInfo->create(PG_VERSION_94, $self->dbSysId(PG_VERSION_94), true);
my $strArchiveId = $oArchiveInfo->archiveId();
# Transfer first file
#---------------------------------------------------------------------------------------------------------------------------
my $strWalPath = "$self->{strRepoPath}/archive/db/${strArchiveId}/000000010000000A";
storageRepo()->pathCreate($strWalPath, {bCreateParent => true});
$self->walGenerate($strWalPath, PG_VERSION_94, 1, "000000010000000A0000000A", false, true, false);
$oGetAsync->processQueue();
$self->testResult(
sub {storageSpool()->list(STORAGE_SPOOL_ARCHIVE_IN)},
"(000000010000000A0000000A, 000000010000000A0000000B.ok)", 'WAL and OK file');
# Transfer second file
#---------------------------------------------------------------------------------------------------------------------------
@stryWal = ('000000010000000A0000000B');
storageSpool()->remove(STORAGE_SPOOL_ARCHIVE_IN . "/000000010000000A0000000B.ok");
$self->walGenerate($strWalPath, PG_VERSION_94, 1, "000000010000000A0000000B", false, true, false);
$oGetAsync->processQueue();
$self->testResult(
sub {storageSpool()->list(STORAGE_SPOOL_ARCHIVE_IN)},
"(000000010000000A0000000A, 000000010000000A0000000B)", 'WAL files');
# Error on main process
#---------------------------------------------------------------------------------------------------------------------------
@stryWal = ('000000010000000A0000000C');
storageTest()->put(storageTest()->openWrite($self->{strLockPath} . "/db.stop", {bPathCreate => true}), undef);
$oGetAsync->processQueue();
$self->testResult(
sub {storageSpool()->list(STORAGE_SPOOL_ARCHIVE_IN)},
"(000000010000000A0000000A, 000000010000000A0000000B, 000000010000000A0000000C.error)", 'WAL files and error file');
# Set protocol timeout low
#---------------------------------------------------------------------------------------------------------------------------
$self->optionTestSet(CFGOPT_PROTOCOL_TIMEOUT, 30);
$self->optionTestSet(CFGOPT_DB_TIMEOUT, 29);
$self->configTestLoad(CFGCMD_ARCHIVE_GET_ASYNC);
$oGetAsync->process();
}
}
1;

View File

@@ -113,6 +113,46 @@ testRun(void)
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "suppress error");
}
// *****************************************************************************************************************************
if (testBegin("archiveAsyncStatusErrorWrite() and archiveAsyncStatusOkWrite()"))
{
StringList *argList = strLstNew();
strLstAddZ(argList, "pgbackrest");
strLstAdd(argList, strNewFmt("--spool-path=%s", testPath()));
strLstAddZ(argList, "--stanza=db");
strLstAddZ(argList, "archive-get-async");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
String *walSegment = strNew("000000010000000100000001");
TEST_RESULT_VOID(
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 25, strNew("error message"), false), "write error");
TEST_RESULT_STR(
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/000000010000000100000001.error"))))),
"25\nerror message", "check error");
TEST_RESULT_VOID(
storageRemoveP(storageTest, strNew("archive/db/in/000000010000000100000001.error"), .errorOnMissing = true),
"remove error");
TEST_RESULT_VOID(
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 66, strNew("multi-line\nerror message"), true),
"write error skip if ok (ok missing)");
TEST_RESULT_STR(
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/000000010000000100000001.error"))))),
"66\nmulti-line\nerror message", "check error");
TEST_RESULT_VOID(
storageRemoveP(storageTest, strNew("archive/db/in/000000010000000100000001.error"), .errorOnMissing = true),
"remove error");
TEST_RESULT_VOID(
archiveAsyncStatusOkWrite(archiveModeGet, walSegment), "write ok file");
TEST_RESULT_VOID(
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 101, strNew("more error message"), true),
"write error skip if ok (ok present)");
TEST_RESULT_BOOL(
storageExistsNP(storageTest, strNew("archive/db/in/000000010000000100000001.error")), false, "error does not exist");
}
// *****************************************************************************************************************************
if (testBegin("walIsPartial()"))
{

View File

@@ -6,6 +6,8 @@ Test Archive Get Command
#include "common/harnessConfig.h"
#include "common/harnessFork.h"
#include "common/io/bufferRead.h"
#include "common/io/bufferWrite.h"
#include "compress/gzipCompress.h"
#include "storage/driver/posix/storage.h"
@@ -20,6 +22,16 @@ testRun(void)
Storage *storageTest = storageDriverPosixInterface(
storageDriverPosixNew(strNew(testPath()), STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, true, NULL));
// Start a protocol server to test the protocol directly
Buffer *serverWrite = bufNew(8192);
IoWrite *serverWriteIo = ioBufferWriteIo(ioBufferWriteNew(serverWrite));
ioWriteOpen(serverWriteIo);
ProtocolServer *server = protocolServerNew(
strNew("test"), strNew("test"), ioBufferReadIo(ioBufferReadNew(bufNew(0))), serverWriteIo);
bufUsedSet(serverWrite, 0);
// *****************************************************************************************************************************
if (testBegin("archiveGetCheck()"))
{
@@ -148,7 +160,8 @@ testRun(void)
String *walDestination = strNewFmt("%s/db/pg_wal/RECOVERYXLOG", testPath());
storagePathCreateNP(storageTest, strPath(walDestination));
TEST_RESULT_INT(archiveGetFile(archiveFile, walDestination, cipherTypeNone, NULL), 1, "WAL segment missing");
TEST_RESULT_INT(
archiveGetFile(storageTest, archiveFile, walDestination, false, cipherTypeNone, NULL), 1, "WAL segment missing");
// Create a WAL segment to copy
// -------------------------------------------------------------------------------------------------------------------------
@@ -163,7 +176,8 @@ testRun(void)
"repo/archive/test1/10-1/01ABCDEF01ABCDEF/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")),
buffer);
TEST_RESULT_INT(archiveGetFile(archiveFile, walDestination, cipherTypeNone, NULL), 0, "WAL segment copied");
TEST_RESULT_INT(
archiveGetFile(storageTest, archiveFile, walDestination, false, cipherTypeNone, NULL), 0, "WAL segment copied");
TEST_RESULT_BOOL(storageExistsNP(storageTest, walDestination), true, " check exists");
TEST_RESULT_INT(storageInfoNP(storageTest, walDestination).size, 16 * 1024 * 1024, " check size");
@@ -213,9 +227,40 @@ testRun(void)
TEST_RESULT_INT(
archiveGetFile(
archiveFile, walDestination, cipherTypeAes256Cbc, strNew("12345678")), 0, "WAL segment copied");
storageTest, archiveFile, walDestination, false, cipherTypeAes256Cbc, strNew("12345678")), 0, "WAL segment copied");
TEST_RESULT_BOOL(storageExistsNP(storageTest, walDestination), true, " check exists");
TEST_RESULT_INT(storageInfoNP(storageTest, walDestination).size, 16 * 1024 * 1024, " check size");
// Check protocol function directly
// -------------------------------------------------------------------------------------------------------------------------
argList = strLstNew();
strLstAddZ(argList, "pgbackrest");
strLstAddZ(argList, "--stanza=test1");
strLstAdd(argList, strNewFmt("--repo1-path=%s/repo", testPath()));
strLstAdd(argList, strNewFmt("--pg1-path=%s/db", testPath()));
strLstAdd(argList, strNewFmt("--spool-path=%s/spool", testPath()));
strLstAddZ(argList, "--repo1-cipher-type=aes-256-cbc");
strLstAddZ(argList, "archive-get-async");
setenv("PGBACKREST_REPO1_CIPHER_PASS", "12345678", true);
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
unsetenv("PGBACKREST_REPO1_CIPHER_PASS");
storagePathCreateNP(storageTest, strNew("spool/archive/test1/in"));
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewStr(archiveFile));
TEST_RESULT_BOOL(
archiveGetProtocol(PROTOCOL_COMMAND_ARCHIVE_GET_STR, paramList, server), true, "protocol archive get");
TEST_RESULT_STR(strPtr(strNewBuf(serverWrite)), "{\"out\":0}\n", "check result");
TEST_RESULT_BOOL(
storageExistsNP(storageTest, strNewFmt("spool/archive/test1/in/%s", strPtr(archiveFile))), true, " check exists");
bufUsedSet(serverWrite, 0);
// Check invalid protocol function
// -------------------------------------------------------------------------------------------------------------------------
TEST_RESULT_BOOL(archiveGetProtocol(strNew(BOGUS_STR), paramList, server), false, "invalid function");
}
// *****************************************************************************************************************************
@@ -290,6 +335,168 @@ testRun(void)
"000000010000000A00000FFE|000000010000000A00000FFF", "check queue");
}
// *****************************************************************************************************************************
if (testBegin("cmdArchiveGetAsync()"))
{
harnessLogLevelSet(logLevelDetail);
StringList *argCleanList = strLstNew();
strLstAddZ(argCleanList, "pgbackrest");
strLstAdd(argCleanList, strNewFmt("--pg1-path=%s/pg", testPath()));
strLstAdd(argCleanList, strNewFmt("--repo1-path=%s/repo", testPath()));
strLstAdd(argCleanList, strNewFmt("--spool-path=%s/spool", testPath()));
strLstAddZ(argCleanList, "--stanza=test2");
strLstAddZ(argCleanList, "archive-get-async");
harnessCfgLoad(strLstSize(argCleanList), strLstPtr(argCleanList));
TEST_ERROR(cmdArchiveGetAsync(), ParamInvalidError, "at least one wal segment is required");
// Create pg_control file and archive.info
// -------------------------------------------------------------------------------------------------------------------------
storagePutNP(
storageNewWriteNP(storageTest, strNew("pg/" PG_PATH_GLOBAL "/" PG_FILE_PGCONTROL)),
pgControlTestToBuffer((PgControl){.version = PG_VERSION_10, .systemId = 0xFACEFACEFACEFACE}));
storagePutNP(
storageNewWriteNP(storageTest, strNew("repo/archive/test2/archive.info")),
bufNewZ(
"[backrest]\n"
"backrest-checksum=\"d962d8d7311d0ae5dc0b05892c15cfa2009d051e\"\n"
"backrest-format=5\n"
"backrest-version=\"2.11\"\n"
"\n"
"[db:history]\n"
"1={\"db-id\":18072658121562454734,\"db-version\":\"10\"}\n"));
// Get a single segment
// -------------------------------------------------------------------------------------------------------------------------
StringList *argList = strLstDup(argCleanList);
strLstAddZ(argList, "000000010000000100000001");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
storagePathCreateNP(storageSpoolWrite(), strNew(STORAGE_SPOOL_ARCHIVE_IN));
TEST_RESULT_VOID(
storagePutNP(
storageNewWriteNP(
storageTest,
strNew(
"repo/archive/test2/10-1/0000000100000001/"
"000000010000000100000001-abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd")),
NULL),
"normal WAL segment");
TEST_RESULT_VOID(cmdArchiveGetAsync(), "archive async");
harnessLogResult(
"P00 INFO: get 1 WAL file(s) from archive: 000000010000000100000001\n"
"P00 DETAIL: found 000000010000000100000001 in the archive");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001")), true,
"check 000000010000000100000001 in spool");
// Get multiple segments where some are missing or errored
// -------------------------------------------------------------------------------------------------------------------------
argList = strLstDup(argCleanList);
strLstAddZ(argList, "000000010000000100000001");
strLstAddZ(argList, "000000010000000100000002");
strLstAddZ(argList, "000000010000000100000003");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
storagePathCreateNP(storageSpoolWrite(), strNew(STORAGE_SPOOL_ARCHIVE_IN));
TEST_RESULT_VOID(
storagePutNP(
storageNewWriteNP(
storageTest,
strNew(
"repo/archive/test2/10-1/0000000100000001/"
"000000010000000100000003-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")),
NULL),
"normal WAL segment");
TEST_RESULT_VOID(
storagePutNP(
storageNewWriteNP(
storageTest,
strNew(
"repo/archive/test2/10-1/0000000100000001/"
"000000010000000100000003-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
NULL),
"duplicate WAL segment");
TEST_RESULT_VOID(cmdArchiveGetAsync(), "archive async");
harnessLogResult(
"P00 INFO: get 3 WAL file(s) from archive: 000000010000000100000001...000000010000000100000003\n"
"P00 DETAIL: found 000000010000000100000001 in the archive\n"
"P00 DETAIL: unable to find 000000010000000100000002 in the archive\n"
"P00 WARN: could not get 000000010000000100000003 from the archive (will be retried): "
"[45] raised from local-1 protocol: duplicates found in archive for WAL segment 000000010000000100000003: "
"000000010000000100000003-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, "
"000000010000000100000003-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\n"
" HINT: are multiple primaries archiving to this stanza?");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001")), true,
"check 000000010000000100000001 in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000002")), false,
"check 000000010000000100000002 not in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000002.ok")), true,
"check 000000010000000100000002.ok in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003")), false,
"check 000000010000000100000003 not in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error")), true,
"check 000000010000000100000003.error in spool");
protocolFree();
// -------------------------------------------------------------------------------------------------------------------------
storageRemoveP(
storageSpoolWrite(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error"), .errorOnMissing = true);
argList = strLstNew();
strLstAddZ(argList, "pgbackrest-bogus");
strLstAdd(argList, strNewFmt("--pg1-path=%s/pg", testPath()));
strLstAdd(argList, strNewFmt("--repo1-path=%s/repo", testPath()));
strLstAdd(argList, strNewFmt("--spool-path=%s/spool", testPath()));
strLstAddZ(argList, "--stanza=test2");
strLstAddZ(argList, "archive-get-async");
strLstAddZ(argList, "000000010000000100000001");
strLstAddZ(argList, "000000010000000100000002");
strLstAddZ(argList, "000000010000000100000003");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_ERROR(
cmdArchiveGetAsync(), ExecuteError,
"local-1 process terminated unexpectedly [102]: unable to execute 'pgbackrest-bogus': [2] No such file or directory");
harnessLogResult(
"P00 INFO: get 3 WAL file(s) from archive: 000000010000000100000001...000000010000000100000003");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000001.error")), false,
"check 000000010000000100000001.error not in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000002.error")), false,
"check 000000010000000100000002.error not in spool");
TEST_RESULT_BOOL(
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error")), true,
"check 000000010000000100000003.error in spool");
TEST_RESULT_STR(
strPtr(
strNewBuf(
storageGetNP(
storageNewReadNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error"))))),
"102\nlocal-1 process terminated unexpectedly [102]: unable to execute 'pgbackrest-bogus': "
"[2] No such file or directory",
"check error");
}
// *****************************************************************************************************************************
if (testBegin("cmdArchiveGet()"))
{

View File

@@ -647,7 +647,8 @@ testRun(void)
strLstAddZ(argList, "/usr/bin/pgbackrest");
strLstAddZ(argList, "--stanza=db");
strLstAddZ(argList, "--protocol-timeout=10");
strLstAddZ(argList, "archive-get");
strLstAddZ(argList, "--process-max=2");
strLstAddZ(argList, "archive-get-async");
harnessCfgLoad(strLstSize(argList), strLstPtr(argList));
TEST_ASSIGN(client, protocolLocalGet(protocolStorageTypeRepo, 1), "get local protocol");
@@ -655,7 +656,6 @@ testRun(void)
TEST_RESULT_PTR(protocolHelper.clientLocal[0].client, client, "check location in cache");
TEST_RESULT_VOID(protocolFree(), "free local and remote protocol objects");
TEST_RESULT_VOID(protocolFree(), "free local and remote protocol objects again");
}
FUNCTION_HARNESS_RESULT_VOID();