1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-16 10:20:02 +02:00
pgbackrest/lib/pgBackRest/Archive/Push/Async.pm
David Steele 4d6c56b4d7 Open log file after async process is completely separated from the main process.
This prevents the main process from also logging to the file.

Suggested by Jens Wilke.
2017-09-20 08:08:36 -04:00

363 lines
13 KiB
Perl

####################################################################################################################################
# ARCHIVE PUSH ASYNC MODULE
####################################################################################################################################
package pgBackRest::Archive::Push::Async;
use parent 'pgBackRest::Archive::Push::Push';
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use English '-no_match_vars';
use Exporter qw(import);
our @EXPORT = qw();
use Fcntl qw(SEEK_CUR O_RDONLY O_WRONLY O_CREAT);
use File::Basename qw(dirname basename);
use IO::Socket::UNIX;
use POSIX qw(setsid);
use Scalar::Util qw(blessed);
use pgBackRest::Common::Exception;
use pgBackRest::Common::Lock;
use pgBackRest::Common::Log;
use pgBackRest::Archive::Common;
use pgBackRest::Archive::Info;
use pgBackRest::Archive::Push::Push;
use pgBackRest::Common::String;
use pgBackRest::Common::Wait;
use pgBackRest::Config::Config;
use pgBackRest::Db;
use pgBackRest::DbVersion;
use pgBackRest::Protocol::Local::Process;
use pgBackRest::Protocol::Helper;
use pgBackRest::Storage::Helper;
use pgBackRest::Version;
####################################################################################################################################
# constructor
####################################################################################################################################
sub new
{
my $class = shift; # Class name
# Init object
my $self = $class->SUPER::new();
bless $self, $class;
# Assign function parameters, defaults, and log debug info
(
my $strOperation,
$self->{strWalPath},
$self->{strSpoolPath},
$self->{strBackRestBin},
) =
logDebugParam
(
__PACKAGE__ . '->new', \@_,
{name => 'strWalPath'},
{name => 'strSpoolPath'},
{name => 'strBackRestBin', default => BACKREST_BIN},
);
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'self', value => $self}
);
}
####################################################################################################################################
# process
#
# Fork and run the async process if it is not already running.
####################################################################################################################################
sub process
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->process');
my $bClient = true;
# This first lock request is a quick test to see if the async process is running. If the lock is successful we need to release
# the lock, fork, and then let the async process acquire its own lock. Otherwise the lock will be held by the client process
# which will soon exit and leave the async process unlocked.
if (lockAcquire(cfgCommandName(cfgCommandGet()), false))
{
&log(TEST, TEST_ARCHIVE_PUSH_ASYNC_START);
lockRelease(cfgCommandName(cfgCommandGet()));
$bClient = fork() == 0 ? false : true;
}
else
{
logDebugMisc($strOperation, 'async archive-push process is already running');
}
# Run async process
if (!$bClient)
{
# uncoverable branch false - reacquire the lock since it was released by the client process above
if (lockAcquire(cfgCommandName(cfgCommandGet()), false))
{
# uncoverable branch true - chdir to /
chdir '/'
or confess &log(ERROR, "unable to chdir to /: $OS_ERROR", ERROR_PATH_MISSING);
# uncoverable branch true - close stdin
open(STDIN, '<', '/dev/null')
or confess &log(ERROR, "unable to close stdin: $OS_ERROR", ERROR_FILE_OPEN);
# uncoverable branch true - close stdout
open(STDOUT, '>', '/dev/null')
or confess &log(ERROR, "unable to close stdout: $OS_ERROR", ERROR_FILE_OPEN);
# uncoverable branch true - close stderr
open(STDERR, '>', '/dev/null')
or confess &log(ERROR, "unable to close stderr: $OS_ERROR", ERROR_FILE_OPEN);
# uncoverable branch true - create new session group
setsid()
or confess &log(ERROR, "unable to create new session group: $OS_ERROR", ERROR_ASSERT);
# Open the log file
logFileSet(storageLocal(), cfgOption(CFGOPT_LOG_PATH) . '/' . cfgOption(CFGOPT_STANZA) . '-archive-async');
# Start processing
$self->processServer();
}
}
# Return from function and log return values if any
return logDebugReturn
(
$strOperation,
{name => 'bClient', value => $bClient, trace => true}
);
}
####################################################################################################################################
# initServer
#
# Create the spool directory and initialize the archive process.
####################################################################################################################################
sub initServer
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->initServer');
# Create the spool path
storageSpool()->pathCreate($self->{strSpoolPath}, {bIgnoreExists => true, bCreateParent => true});
# Initialize the archive process
$self->{oArchiveProcess} = new pgBackRest::Protocol::Local::Process(
CFGOPTVAL_LOCAL_TYPE_BACKUP, cfgOption(CFGOPT_PROTOCOL_TIMEOUT) < 60 ? cfgOption(CFGOPT_PROTOCOL_TIMEOUT) / 2 : 30,
$self->{strBackRestBin}, false);
$self->{oArchiveProcess}->hostAdd(1, cfgOption(CFGOPT_PROCESS_MAX));
# Return from function and log return values if any
return logDebugReturn($strOperation);
}
####################################################################################################################################
# processServer
#
# Setup the server and process the queue. This function is separate from processQueue() for testing purposes.
####################################################################################################################################
sub processServer
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->processServer');
# There is no loop here because it seems wise to let the async process exit periodically. As the queue grows each async
# execution will naturally run longer. This behavior is also far easier to test.
$self->initServer();
$self->processQueue();
# Return from function and log return values if any
return logDebugReturn($strOperation);
}
####################################################################################################################################
# processQueue
#
# Push WAL to the archive.
####################################################################################################################################
sub processQueue
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my ($strOperation) = logDebugParam(__PACKAGE__ . '->processQueue');
# Get jobs to process
my $stryWalFile = $self->readyList();
# Queue the jobs
foreach my $strWalFile (@{$stryWalFile})
{
$self->{oArchiveProcess}->queueJob(
1, 'default', $strWalFile, OP_ARCHIVE_PUSH_FILE, [$self->{strWalPath}, $strWalFile, cfgOption(CFGOPT_COMPRESS)]);
}
# Process jobs if there are any
my $iOkTotal = 0;
my $iErrorTotal = 0;
my $iDropTotal = 0;
if ($self->{oArchiveProcess}->jobTotal() > 0)
{
&log(INFO,
'push ' . @{$stryWalFile} . ' WAL file(s) to archive: ' .
${$stryWalFile}[0] . (@{$stryWalFile} > 1 ? "...${$stryWalFile}[-1]" : ''));
eval
{
# Hold a lock when the repo is remote to be sure no other process is pushing WAL
!isRepoLocal() && protocolGet(CFGOPTVAL_REMOTE_TYPE_BACKUP);
while (my $hyJob = $self->{oArchiveProcess}->process())
{
# Send keep alives to protocol
protocolKeepAlive();
foreach my $hJob (@{$hyJob})
{
my $strWalFile = @{$hJob->{rParam}}[1];
my $strWarning = @{$hJob->{rResult}}[0];
# If error then write out an error file
if (defined($hJob->{oException}))
{
$self->walStatusWrite(
WAL_STATUS_ERROR, $strWalFile, $hJob->{oException}->code(), $hJob->{oException}->message());
$iErrorTotal++;
&log(WARN,
"could not push WAl file ${strWalFile} to archive (will be retried): [" .
$hJob->{oException}->code() . "] " . $hJob->{oException}->message());
}
# Else write success
else
{
$self->walStatusWrite(
WAL_STATUS_OK, $strWalFile, defined($strWarning) ? 0 : undef,
defined($strWarning) ? $strWarning : undef);
$iOkTotal++;
&log(DETAIL, "pushed WAL file ${strWalFile} to archive", undef, undef, undef, $hJob->{iProcessId});
}
}
# Drop any jobs that exceed the queue max
if (cfgOptionTest(CFGOPT_ARCHIVE_QUEUE_MAX))
{
my $stryDropList = $self->dropList($self->readyList());
if (@{$stryDropList} > 0)
{
foreach my $strDropFile (@{$stryDropList})
{
$self->walStatusWrite(
WAL_STATUS_OK, $strDropFile, 0,
"dropped WAL file ${strDropFile} because archive queue exceeded " .
cfgOption(CFGOPT_ARCHIVE_QUEUE_MAX) . ' bytes');
$iDropTotal++;
}
$self->{oArchiveProcess}->dequeueJobs(1, 'default');
}
}
}
return 1;
}
or do
{
# Get error info
my $iCode = exceptionCode($EVAL_ERROR);
my $strMessage = exceptionMessage($EVAL_ERROR);
# Error all ready jobs
foreach my $strWalFile (@{$self->readyList()})
{
$self->walStatusWrite(
WAL_STATUS_ERROR, $strWalFile, $iCode, $strMessage);
}
}
}
return logDebugReturn
(
$strOperation,
{name => 'iNewTotal', value => scalar(@{$stryWalFile})},
{name => 'iDropTotal', value => $iDropTotal},
{name => 'iOkTotal', value => $iOkTotal},
{name => 'iErrorTotal', value => $iErrorTotal}
);
}
####################################################################################################################################
# walStatusWrite
#
# Write out a status file to the spool path with information about the success or failure of an archive push.
####################################################################################################################################
sub walStatusWrite
{
my $self = shift;
# Assign function parameters, defaults, and log debug info
my
(
$strOperation,
$strType,
$strWalFile,
$iCode,
$strMessage,
) =
logDebugParam
(
__PACKAGE__ . '->writeStatus', \@_,
{name => 'strType'},
{name => 'strWalFile'},
{name => 'iCode', required => false},
{name => 'strMessage', required => false},
);
# Remove any error file exists unless a new one will be written
if ($strType ne WAL_STATUS_ERROR)
{
# Remove the error file, if any
storageSpool()->remove("$self->{strSpoolPath}/${strWalFile}.error", {bIgnoreMissing => true});
}
# Write the status file
my $strStatus;
if (defined($iCode))
{
if (!defined($strMessage))
{
confess &log(ASSERT, 'strMessage must be set when iCode is set');
}
$strStatus = "${iCode}\n${strMessage}";
}
elsif ($strType eq WAL_STATUS_ERROR)
{
confess &log(ASSERT, 'error status must have iCode and strMessage set');
}
storageSpool()->put(
storageSpool()->openWrite("$self->{strSpoolPath}/${strWalFile}.${strType}", {bAtomic => true}), $strStatus);
}
1;