1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-03 14:52:21 +02:00

Exceptions are now passed back from threads as messages when possible rather than raised directly.

This commit is contained in:
David Steele 2016-07-29 18:21:57 -04:00
parent 93320b865e
commit b447863218
2 changed files with 180 additions and 132 deletions

View File

@ -152,6 +152,10 @@
<p>All remote types now take locks. The exceptions date to when the test harness and <backrest/> were running in the same VM and no longer apply.</p>
</release-item>
<release-item>
<p>Exceptions are now passed back from threads as messages when possible rather than raised directly.</p>
</release-item>
<release-item>
<p>Temp files created during backup are now placed in the same directory as the target file.</p>
</release-item>

View File

@ -11,8 +11,10 @@ use Carp qw(confess);
use Exporter qw(import);
our @EXPORT = qw();
use File::Basename;
use Scalar::Util qw(blessed);
use lib dirname($0) . '/../lib';
use pgBackRest::Common::Exception;
use pgBackRest::Common::Log;
use pgBackRest::Common::Wait;
use pgBackRest::Config::Config;
@ -78,96 +80,121 @@ sub threadGroupThread
}
&log(TRACE, "$$oCommand{function} thread started");
my $oFile = undef;
# Get the protocol object
my $oProtocol = protocolGet(undef, false, $iThreadIdx + 1);
# Create a file object
my $oFile = new pgBackRest::File
(
optionGet(OPTION_STANZA),
optionGet(OPTION_REPO_PATH),
optionRemoteType(),
$oProtocol,
undef, undef,
$iThreadIdx + 1
);
# Notify parent that init is complete
threadMessage($oyResultQueue[$iThreadIdx], 'init');
my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread
# Initialize the starting and current queue index based in the total number of threads in relation to this thread
my $iQueueStartIdx = int((@{$$oCommand{param}{queue}} / $$oCommand{thread_total}) * $iThreadIdx);
my $iQueueIdx = $iQueueStartIdx;
# Keep track of progress (ignored for threaded backup and restore)
my $lSizeCurrent = 0;
# Loop through all the queues (exit when the original queue is reached)
do
eval
{
&log(TRACE, "reading queue ${iQueueIdx}, start queue ${iQueueStartIdx}");
# Get the protocol object
my $oProtocol = protocolGet(undef, false, $iThreadIdx + 1);
while (my $oMessage = ${$$oCommand{param}{queue}}[$iQueueIdx]->dequeue_nb())
# Create a file object
$oFile = new pgBackRest::File
(
optionGet(OPTION_STANZA),
optionGet(OPTION_REPO_PATH),
optionRemoteType(),
$oProtocol,
undef, undef,
$iThreadIdx + 1
);
# Notify parent that init is complete
threadMessage($oyResultQueue[$iThreadIdx], 'init');
my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread
# Initialize the starting and current queue index based in the total number of threads in relation to this thread
my $iQueueStartIdx = int((@{$$oCommand{param}{queue}} / $$oCommand{thread_total}) * $iThreadIdx);
my $iQueueIdx = $iQueueStartIdx;
# Keep track of progress (ignored for threaded backup and restore)
my $lSizeCurrent = 0;
# Loop through all the queues (exit when the original queue is reached)
do
{
if ($$oCommand{function} eq 'restore')
{
restoreFile($oMessage, $$oCommand{param}{copy_time_begin}, $$oCommand{param}{delta}, $$oCommand{param}{force},
$$oCommand{param}{backup_path}, $$oCommand{param}{source_compression},
$$oCommand{param}{current_user}, $$oCommand{param}{current_group}, $oFile);
}
elsif ($$oCommand{function} eq 'backup')
{
# Result hash that can be passed back to the master process
my $oResult = {};
&log(TRACE, "reading queue ${iQueueIdx}, start queue ${iQueueStartIdx}");
# Backup the file
($$oResult{copied}, $lSizeCurrent, $$oResult{size}, $$oResult{repo_size}, $$oResult{checksum}) =
backupFile($oFile, $$oMessage{db_file}, $$oMessage{repo_file}, $$oCommand{param}{compress},
$$oMessage{checksum}, $$oMessage{modification_time}, $$oMessage{size});
# Send a message to update the manifest
$$oResult{repo_file} = $$oMessage{repo_file};
$$oCommand{param}{result_queue}->enqueue($oResult);
}
else
while (my $oMessage = ${$$oCommand{param}{queue}}[$iQueueIdx]->dequeue_nb())
{
confess &log(ERROR, "unknown command");
if ($$oCommand{function} eq 'restore')
{
restoreFile($oMessage, $$oCommand{param}{copy_time_begin}, $$oCommand{param}{delta}, $$oCommand{param}{force},
$$oCommand{param}{backup_path}, $$oCommand{param}{source_compression},
$$oCommand{param}{current_user}, $$oCommand{param}{current_group}, $oFile);
}
elsif ($$oCommand{function} eq 'backup')
{
# Result hash that can be passed back to the master process
my $oResult = {};
# Backup the file
($$oResult{copied}, $lSizeCurrent, $$oResult{size}, $$oResult{repo_size}, $$oResult{checksum}) =
backupFile($oFile, $$oMessage{db_file}, $$oMessage{repo_file}, $$oCommand{param}{compress},
$$oMessage{checksum}, $$oMessage{modification_time}, $$oMessage{size});
# Send a message to update the manifest
$$oResult{repo_file} = $$oMessage{repo_file};
$$oCommand{param}{result_queue}->enqueue($oResult);
}
else
{
confess &log(ERROR, "unknown command");
}
# Keep the protocol layer from timing out while checksumming
$oProtocol->keepAlive();
}
# Keep the protocol layer from timing out while checksumming
$oProtocol->keepAlive();
# Even numbered threads move up when they have finished a queue, odd numbered threads move down
$iQueueIdx += $iDirection;
# Reset the queue index when it goes over or under the number of queues
if ($iQueueIdx < 0)
{
$iQueueIdx = @{$$oCommand{param}{queue}} - 1;
}
elsif ($iQueueIdx >= @{$$oCommand{param}{queue}})
{
$iQueueIdx = 0;
}
}
while ($iQueueIdx != $iQueueStartIdx);
# Even numbered threads move up when they have finished a queue, odd numbered threads move down
$iQueueIdx += $iDirection;
# Notify parent of shutdown
threadMessage($oyResultQueue[$iThreadIdx], 'shutdown');
threadMessageExpect($oyMessageQueue[$iThreadIdx], 'continue');
# Reset the queue index when it goes over or under the number of queues
if ($iQueueIdx < 0)
# Destroy the file object
undef($oFile);
# Notify the parent process of thread exit
threadMessage($oyResultQueue[$iThreadIdx], 'complete');
&log(TRACE, "$$oCommand{function} thread exiting");
};
if ($@)
{
my $oMessage = $@;
threadMessage($oyResultQueue[$iThreadIdx], 'error');
threadMessageExpect($oyMessageQueue[$iThreadIdx], 'continue');
undef($oFile);
threadMessage($oyResultQueue[$iThreadIdx], 'complete');
if (blessed($oMessage) && $oMessage->isa('pgBackRest::Common::Exception'))
{
$iQueueIdx = @{$$oCommand{param}{queue}} - 1;
threadMessage($oyResultQueue[$iThreadIdx], $oMessage->code());
threadMessage($oyResultQueue[$iThreadIdx], $oMessage->message());
}
elsif ($iQueueIdx >= @{$$oCommand{param}{queue}})
else
{
$iQueueIdx = 0;
threadMessage($oyResultQueue[$iThreadIdx], ERROR_UNKNOWN);
threadMessage($oyResultQueue[$iThreadIdx], $oMessage);
}
}
while ($iQueueIdx != $iQueueStartIdx);
# Notify parent of shutdown
threadMessage($oyResultQueue[$iThreadIdx], 'shutdown');
threadMessageExpect($oyMessageQueue[$iThreadIdx], 'continue');
# Destroy the file object
undef($oFile);
# Notify the parent process of thread exit
$oyResultQueue[$iThreadIdx]->enqueue('complete');
&log(TRACE, "$$oCommand{function} thread exiting");
}
}
@ -208,7 +235,7 @@ sub threadMessageExpect
if (defined($iTimeout))
{
&log(TRACE, "waiting for '${strExpected}' message from ${strContext}");
&log(TRACE, "waiting for '" . (defined($strExpected) ? $strExpected : '[undef]') . "' message from ${strContext}");
my $oWait = waitInit($iTimeout);
@ -222,19 +249,19 @@ sub threadMessageExpect
{
$strMessage = $oQueue->dequeue_nb();
return false if !defined($strMessage);
return if !defined($strMessage);
}
# Throw an exeception when the message was not received
if (!defined($strMessage) || $strMessage ne $strExpected)
if (!defined($strMessage) || (defined($strExpected) && $strMessage ne $strExpected))
{
confess &log(ASSERT, "expected message '$strExpected' from ${strContext} but " .
(defined($strMessage) ? "got '$strMessage'" : "timed out after ${iTimeout} second(s)"));
}
&log(TRACE, "got '${strExpected}' message from ${strContext}");
&log(TRACE, "got '" . (defined($strExpected) ? $strExpected : '[undef]') . "' message from ${strContext}");
return true;
return $strMessage;
}
####################################################################################################################################
@ -276,79 +303,96 @@ sub threadGroupComplete
# Wait for all threads to complete and handle errors
my $iThreadComplete = 0;
my $lTimeBegin = time();
my $iFirstErrorCode;
my $strFirstError;
my $iFirstErrorThreadIdx;
&log(TRACE, "waiting for " . @oyThread . " threads to complete");
# Rejoin the threads
# while ($iThreadComplete < @oyThread)
# {
waitHiRes(.1);
waitHiRes(.1);
# If a timeout has been defined, make sure we have not been running longer than that
if (defined($iTimeout))
# If a timeout has been defined, make sure we have not been running longer than that
if (defined($iTimeout))
{
if (time() - $lTimeBegin >= $iTimeout)
{
if (time() - $lTimeBegin >= $iTimeout)
{
confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting...");
}
confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting...");
}
}
for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++)
{
if ($byThreadRunning[$iThreadIdx])
{
if ($byThreadRunning[$iThreadIdx])
{
if (threadMessageExpect($oyResultQueue[$iThreadIdx], 'shutdown', $iThreadIdx, true))
{
threadMessage($oyMessageQueue[$iThreadIdx], 'continue', $iThreadIdx);
threadMessageExpect($oyResultQueue[$iThreadIdx], 'complete', $iThreadIdx);
my $strMessage = threadMessageExpect($oyResultQueue[$iThreadIdx], undef, $iThreadIdx, true);
# Check for thread shutdown
if (defined($strMessage) && $strMessage eq 'shutdown')
{
threadMessage($oyMessageQueue[$iThreadIdx], 'continue', $iThreadIdx);
threadMessageExpect($oyResultQueue[$iThreadIdx], 'complete', $iThreadIdx);
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
}
# Check for handled errors
if (defined($strMessage) && $strMessage eq 'error')
{
threadMessage($oyMessageQueue[$iThreadIdx], 'continue', $iThreadIdx);
threadMessageExpect($oyResultQueue[$iThreadIdx], 'complete', $iThreadIdx);
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
if (!defined($strFirstError))
{
$iFirstErrorCode = threadMessageExpect($oyResultQueue[$iThreadIdx], undef, $iThreadIdx);
$strFirstError = threadMessageExpect($oyResultQueue[$iThreadIdx], undef, $iThreadIdx);
$iFirstErrorThreadIdx = $iThreadIdx;
}
}
# Check for unhandled errors
my $oError = $oyThread[$iThreadIdx]->error();
if (defined($oError))
{
my $strError;
if ($oError->isa('pgBackRest::Common::Exception'))
{
$strError = $oError->message();
}
else
{
$strError = $oError;
&log(ERROR, "thread " . ($iThreadIdx) . ": ${strError}");
}
if (!defined($strFirstError))
{
$strFirstError = $strError;
$iFirstErrorThreadIdx = $iThreadIdx;
}
if ($byThreadRunning[$iThreadIdx])
{
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
}
# Check for errors
my $oError = $oyThread[$iThreadIdx]->error();
if (defined($oError))
{
my $strError;
if ($oError->isa('pgBackRest::Common::Exception'))
{
$strError = $oError->message();
}
else
{
$strError = $oError;
&log(ERROR, "thread " . ($iThreadIdx) . ": ${strError}");
}
if (!defined($strFirstError))
{
$strFirstError = $strError;
$iFirstErrorThreadIdx = $iThreadIdx;
}
if ($byThreadRunning[$iThreadIdx])
{
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
}
}
}
else
{
$iThreadComplete++;
}
}
# }
else
{
$iThreadComplete++;
}
}
# If there were errors then confess them
if (defined($strFirstError) && $bConfessOnError)
{
confess &log(ERROR, 'error in thread' . ($iFirstErrorThreadIdx + 1) . ": $strFirstError");
confess &log(ERROR, 'error in thread ' . ($iFirstErrorThreadIdx + 1) . ": $strFirstError", $iFirstErrorCode);
}
# Return true if all threads have completed