1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-14 10:13:05 +02:00
pgbackrest/lib/BackRest/ThreadGroup.pm
David Steele 7081c8b867 New model where threads are created early and destroyed late.
Backups now work like restores in terms of how jobs are queued.
Split out BackupFile and RestoreFile for easier multi-threading/processing.
2015-04-07 07:34:37 -04:00

381 lines
14 KiB
Perl

####################################################################################################################################
# THREADGROUP MODULE
####################################################################################################################################
package BackRest::ThreadGroup;
use threads;
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use File::Basename;
use lib dirname($0) . '/../lib';
use BackRest::Utility;
use BackRest::Config;
use BackRest::RestoreFile;
use BackRest::BackupFile;
####################################################################################################################################
# MODULE EXPORTS
####################################################################################################################################
use Exporter qw(import);
our @EXPORT = qw(threadGroupCreate threadGroupRun threadGroupComplete threadGroupDestroy);
my @oyThread;
my @oyMessageQueue;
my @oyCommandQueue;
my @oyResultQueue;
my @byThreadRunning;
####################################################################################################################################
# threadGroupCreate
####################################################################################################################################
sub threadGroupCreate
{
# If thread-max is not defined then this operation does not use threads
if (!optionTest(OPTION_THREAD_MAX))
{
return;
}
# Get thread-max
my $iThreadMax = optionGet(OPTION_THREAD_MAX);
# Only create threads when thread-max > 1
if ($iThreadMax > 1)
{
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++)
{
push @oyCommandQueue, Thread::Queue->new();
push @oyMessageQueue, Thread::Queue->new();
push @oyResultQueue, Thread::Queue->new();
push @oyThread, (threads->create(\&threadGroupThread, $iThreadIdx));
push @byThreadRunning, false;
}
}
}
####################################################################################################################################
# threadGroupThread
####################################################################################################################################
sub threadGroupThread
{
my $iThreadIdx = shift;
# When a KILL signal is received, immediately abort
$SIG{'KILL'} = sub {threads->exit();};
while (my $oCommand = $oyCommandQueue[$iThreadIdx]->dequeue())
{
# Exit thread
if ($$oCommand{function} eq 'exit')
{
&log(TRACE, 'thread terminated');
return;
}
&log(TRACE, "$$oCommand{function} thread started");
# Create a file object
my $oFile = new BackRest::File
(
optionGet(OPTION_STANZA),
optionRemoteTypeTest(BACKUP) ? optionGet(OPTION_REPO_REMOTE_PATH) : optionGet(OPTION_REPO_PATH),
optionRemoteType(),
optionRemote(undef, false),
undef, undef,
$iThreadIdx + 1
);
# Notify parent that init is complete
threadMessage($oyResultQueue[$iThreadIdx], 'init');
my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread
# Initialize the starting and current queue index based in the total number of threads in relation to this thread
my $iQueueStartIdx = int((@{$$oCommand{param}{queue}} / $$oCommand{thread_total}) * $iThreadIdx);
my $iQueueIdx = $iQueueStartIdx;
# Keep track of progress
my $lSizeCurrent = 0; # Running total of bytes copied
# Loop through all the queues (exit when the original queue is reached)
do
{
&log(TRACE, "reading queue ${iQueueIdx}, start queue ${iQueueStartIdx}");
while (my $oMessage = ${$$oCommand{param}{queue}}[$iQueueIdx]->dequeue_nb())
{
if ($$oCommand{function} eq 'restore')
{
my $strSourcePath = (split(/\|/, $oMessage))[0];
my $strFileName = (split(/\|/, $oMessage))[1];
restoreFile($strSourcePath, $strFileName, $$oCommand{param}{copy_time_begin}, $$oCommand{param}{delta},
$$oCommand{param}{force}, $$oCommand{param}{backup_path}, $$oCommand{param}{source_compression},
$$oCommand{param}{current_user}, $$oCommand{param}{current_group}, $$oCommand{param}{manifest},
$oFile);
}
elsif ($$oCommand{function} eq 'backup')
{
my $bCopied; # Was the file copied?
my $lCopySize; # Size reported by copy
my $strCopyChecksum; # Checksum reported by copy
# Backup the file
($bCopied, $lSizeCurrent, $lCopySize, $strCopyChecksum) =
backupFile($oFile, $$oMessage{db_file}, $$oMessage{backup_file}, $$oCommand{param}{compress},
$$oMessage{checksum}, $$oMessage{checksum_only},
$$oMessage{size}, $$oCommand{param}{size_total}, $lSizeCurrent);
# If copy was successful store the checksum and size
if ($bCopied)
{
$$oCommand{param}{result_queue}->enqueue("checksum|$$oMessage{file_section}|" .
"$$oMessage{file}|${strCopyChecksum}|${lCopySize}");
}
# Else the file was removed during backup so remove from manifest
else
{
$$oCommand{param}{result_queue}->enqueue("remove|$$oMessage{file_section}|".
"$$oMessage{file}");
}
}
else
{
confess &log(ERROR, "unknown command");
}
}
# Even numbered threads move up when they have finished a queue, odd numbered threads move down
$iQueueIdx += $iDirection;
# Reset the queue index when it goes over or under the number of queues
if ($iQueueIdx < 0)
{
$iQueueIdx = @{$$oCommand{param}{queue}} - 1;
}
elsif ($iQueueIdx >= @{$$oCommand{param}{queue}})
{
$iQueueIdx = 0;
}
}
while ($iQueueIdx != $iQueueStartIdx);
# Notify parent of shutdown
threadMessage($oyResultQueue[$iThreadIdx], 'shutdown');
threadMessageExpect($oyMessageQueue[$iThreadIdx], 'continue');
# Destroy the file object
undef($oFile);
# Notify the parent process of thread exit
$oyResultQueue[$iThreadIdx]->enqueue('complete');
&log(TRACE, "$$oCommand{function} thread exiting");
}
}
####################################################################################################################################
# threadMessage
####################################################################################################################################
sub threadMessage
{
my $oQueue = shift;
my $strMessage = shift;
my $iThreadIdx = shift;
# Send the message
$oQueue->enqueue($strMessage);
# Define calling context
&log(TRACE, "sent message '${strMessage}' to " . (defined($iThreadIdx) ? 'thread ' . ($iThreadIdx + 1) : 'controller'));
}
####################################################################################################################################
# threadMessageExpect
####################################################################################################################################
sub threadMessageExpect
{
my $oQueue = shift;
my $strExpected = shift;
my $iThreadIdx = shift;
my $bNoBlock = shift;
# Set timeout based on the message type
my $iTimeout = defined($bNoBlock) ? undef: 600;
# Define calling context
my $strContext = defined($iThreadIdx) ? 'thread ' . ($iThreadIdx + 1) : 'controller';
# Wait for the message
my $strMessage;
if (defined($iTimeout))
{
&log(TRACE, "waiting for '${strExpected}' message from ${strContext}");
$strMessage = $oQueue->dequeue_timed($iTimeout);
}
else
{
$strMessage = $oQueue->dequeue_nb();
return false if !defined($strMessage);
}
# Throw an exeception when the message was not received
if (!defined($strMessage) || $strMessage ne $strExpected)
{
confess &log(ASSERT, "expected message '$strExpected' from ${strContext} but " .
(defined($strMessage) ? "got '$strMessage'" : "timed out after ${iTimeout} second(s)"));
}
&log(TRACE, "got '${strExpected}' message from ${strContext}");
return true;
}
####################################################################################################################################
# threadGroupRun
####################################################################################################################################
sub threadGroupRun
{
my $iThreadIdx = shift;
my $strFunction = shift;
my $oParam = shift;
my %oCommand;
$oCommand{function} = $strFunction;
$oCommand{thread_total} = @oyThread;
$oCommand{param} = $oParam;
$oyCommandQueue[$iThreadIdx]->enqueue(\%oCommand);
threadMessageExpect($oyResultQueue[$iThreadIdx], 'init', $iThreadIdx);
$byThreadRunning[$iThreadIdx] = true;
}
####################################################################################################################################
# threadGroupComplete
#
# Wait for threads to complete.
####################################################################################################################################
sub threadGroupComplete
{
my $self = shift;
my $iTimeout = shift;
my $bConfessOnError = shift;
# Set defaults
$bConfessOnError = defined($bConfessOnError) ? $bConfessOnError : true;
# Wait for all threads to complete and handle errors
my $iThreadComplete = 0;
my $lTimeBegin = time();
my $strFirstError;
my $iFirstErrorThreadIdx;
&log(DEBUG, "waiting for " . @oyThread . " threads to complete");
# Rejoin the threads
while ($iThreadComplete < @oyThread)
{
hsleep(.1);
# If a timeout has been defined, make sure we have not been running longer than that
if (defined($iTimeout))
{
if (time() - $lTimeBegin >= $iTimeout)
{
confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting...");
}
}
for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++)
{
if ($byThreadRunning[$iThreadIdx])
{
my $oError = $oyThread[$iThreadIdx]->error();
if (defined($oError))
{
my $strError;
if ($oError->isa('BackRest::Exception'))
{
$strError = $oError->message();
}
else
{
$strError = $oError;
&log(ERROR, "thread " . ($iThreadIdx) . ": ${strError}");
}
if (!defined($strFirstError))
{
$strFirstError = $strError;
$iFirstErrorThreadIdx = $iThreadIdx;
}
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
}
elsif (threadMessageExpect($oyResultQueue[$iThreadIdx], 'shutdown', $iThreadIdx, true))
{
threadMessage($oyMessageQueue[$iThreadIdx], 'continue', $iThreadIdx);
threadMessageExpect($oyResultQueue[$iThreadIdx], 'complete', $iThreadIdx);
$byThreadRunning[$iThreadIdx] = false;
$iThreadComplete++;
}
}
}
}
&log(DEBUG, 'all threads exited');
if (defined($strFirstError) && $bConfessOnError)
{
confess &log(ERROR, 'error in thread' . ($iFirstErrorThreadIdx + 1) . ": $strFirstError");
}
}
####################################################################################################################################
# threadGroupDestroy
####################################################################################################################################
sub threadGroupDestroy
{
my $self = shift;
&log(TRACE, "waiting for " . @oyThread . " threads to be destroyed");
for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++)
{
my %oCommand;
$oCommand{function} = 'exit';
$oyCommandQueue[$iThreadIdx]->enqueue(\%oCommand);
hsleep(.1);
if ($oyThread[$iThreadIdx]->is_running())
{
$oyThread[$iThreadIdx]->kill('KILL')->join();
&log(TRACE, "thread ${iThreadIdx} killed");
}
elsif ($oyThread[$iThreadIdx]->is_joinable())
{
$oyThread[$iThreadIdx]->join();
&log(TRACE, "thread ${iThreadIdx} joined");
}
undef($oyThread[$iThreadIdx]);
}
&log(TRACE, @oyThread . " threads destroyed");
return(@oyThread);
}
1;