You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-13 01:00:23 +02:00
Backup/restore copy will be run in the main process when thread-max=1. I've resisted this change because it adds complexity, but I have to accept that threads are not stable on all platforms. Or maybe any platform.
This commit is contained in:
@ -1047,86 +1047,106 @@ sub backup_file
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# End each thread queue and start the backup_file threads
|
if ($iThreadLocalMax > 1)
|
||||||
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
|
||||||
{
|
{
|
||||||
# Output info about how much work each thread is going to do
|
# End each thread queue and start the backup_file threads
|
||||||
&log(DEBUG, "thread ${iThreadIdx} large total $oyThreadData[$iThreadIdx]{large_total}, " .
|
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
||||||
"size $oyThreadData[$iThreadIdx]{large_size}");
|
|
||||||
&log(DEBUG, "thread ${iThreadIdx} small total $oyThreadData[$iThreadIdx]{small_total}, " .
|
|
||||||
"size $oyThreadData[$iThreadIdx]{small_size}");
|
|
||||||
|
|
||||||
# Start the thread
|
|
||||||
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
|
|
||||||
$oyThreadData[$iThreadIdx]{size});
|
|
||||||
}
|
|
||||||
|
|
||||||
# Wait for the threads to complete
|
|
||||||
backup_thread_complete($iThreadTimeout);
|
|
||||||
|
|
||||||
# Read the messages that we passed back from the threads. These should be two types:
|
|
||||||
# 1) remove - files that were skipped because they were removed from the database during backup
|
|
||||||
# 2) checksum - file checksums calculated by the threads
|
|
||||||
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
|
||||||
{
|
|
||||||
while (my $strMessage = $oMasterQueue[$iThreadIdx]->dequeue_nb())
|
|
||||||
{
|
{
|
||||||
&log (DEBUG, "message received in master queue: ${strMessage}");
|
# Output info about how much work each thread is going to do
|
||||||
|
&log(DEBUG, "thread ${iThreadIdx} large total $oyThreadData[$iThreadIdx]{large_total}, " .
|
||||||
|
"size $oyThreadData[$iThreadIdx]{large_size}");
|
||||||
|
&log(DEBUG, "thread ${iThreadIdx} small total $oyThreadData[$iThreadIdx]{small_total}, " .
|
||||||
|
"size $oyThreadData[$iThreadIdx]{small_size}");
|
||||||
|
|
||||||
# Split the message. Currently using | as the split character. Not ideal, but it will do for now.
|
# Start the thread
|
||||||
my @strSplit = split(/\|/, $strMessage);
|
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, true, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
|
||||||
|
$oyThreadData[$iThreadIdx]{size}, $oBackupManifest);
|
||||||
|
}
|
||||||
|
|
||||||
my $strCommand = $strSplit[0]; # Command to execute on a file
|
# Wait for the threads to complete
|
||||||
my $strFileSection = $strSplit[1]; # File section where the file is located
|
backup_thread_complete($iThreadTimeout);
|
||||||
my $strFile = $strSplit[2]; # The file to act on
|
|
||||||
|
|
||||||
# These three parts are required
|
# Read the messages that we passed back from the threads. These should be two types:
|
||||||
if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile))
|
# 1) remove - files that were skipped because they were removed from the database during backup
|
||||||
|
# 2) checksum - file checksums calculated by the threads
|
||||||
|
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
||||||
|
{
|
||||||
|
while (my $strMessage = $oMasterQueue[$iThreadIdx]->dequeue_nb())
|
||||||
{
|
{
|
||||||
confess &log(ASSERT, 'thread messages must have strCommand, strFileSection and strFile defined');
|
&log (DEBUG, "message received in master queue: ${strMessage}");
|
||||||
}
|
|
||||||
|
|
||||||
&log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}");
|
# Split the message. Currently using | as the split character. Not ideal, but it will do for now.
|
||||||
|
my @strSplit = split(/\|/, $strMessage);
|
||||||
|
|
||||||
# If command is 'remove' then mark the skipped file in the manifest
|
my $strCommand = $strSplit[0]; # Command to execute on a file
|
||||||
if ($strCommand eq 'remove')
|
my $strFileSection = $strSplit[1]; # File section where the file is located
|
||||||
{
|
my $strFile = $strSplit[2]; # The file to act on
|
||||||
$oBackupManifest->remove($strFileSection, $strFile);
|
|
||||||
|
|
||||||
&log (INFO, "removed file ${strFileSection}:${strFile} from the manifest (it was removed by db during backup)");
|
# These three parts are required
|
||||||
}
|
if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile))
|
||||||
# If command is 'checksum' then record the checksum in the manifest
|
|
||||||
elsif ($strCommand eq 'checksum')
|
|
||||||
{
|
|
||||||
my $strChecksum = $strSplit[3]; # File checksum calculated by the thread
|
|
||||||
|
|
||||||
# Checksum must be defined
|
|
||||||
if (!defined($strChecksum))
|
|
||||||
{
|
{
|
||||||
confess &log(ASSERT, 'thread checksum messages must have strChecksum defined');
|
confess &log(ASSERT, 'thread messages must have strCommand, strFileSection and strFile defined');
|
||||||
}
|
}
|
||||||
|
|
||||||
$oBackupManifest->set($strFileSection, $strFile, MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
|
&log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}");
|
||||||
|
|
||||||
# Log the checksum
|
# If command is 'remove' then mark the skipped file in the manifest
|
||||||
&log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}");
|
if ($strCommand eq 'remove')
|
||||||
|
{
|
||||||
|
$oBackupManifest->remove($strFileSection, $strFile);
|
||||||
|
|
||||||
|
&log (INFO, "removed file ${strFileSection}:${strFile} from the manifest (it was removed by db during backup)");
|
||||||
|
}
|
||||||
|
# If command is 'checksum' then record the checksum in the manifest
|
||||||
|
elsif ($strCommand eq 'checksum')
|
||||||
|
{
|
||||||
|
my $strChecksum = $strSplit[3]; # File checksum calculated by the thread
|
||||||
|
|
||||||
|
# Checksum must be defined
|
||||||
|
if (!defined($strChecksum))
|
||||||
|
{
|
||||||
|
confess &log(ASSERT, 'thread checksum messages must have strChecksum defined');
|
||||||
|
}
|
||||||
|
|
||||||
|
$oBackupManifest->set($strFileSection, $strFile, MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
|
||||||
|
|
||||||
|
# Log the checksum
|
||||||
|
&log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
&log(DEBUG, "starting backup in main process");
|
||||||
|
backup_file_thread(false, 0, !$bNoChecksum, !$bPathCreate, $oyThreadData[0]{size}, $oBackupManifest);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sub backup_file_thread
|
sub backup_file_thread
|
||||||
{
|
{
|
||||||
my @args = @_;
|
my @args = @_;
|
||||||
|
|
||||||
my $iThreadIdx = $args[0]; # Defines the index of this thread
|
my $bMulti = $args[0]; # Is this thread one of many?
|
||||||
my $bChecksum = $args[1]; # Should checksums be generated on files after they have been backed up?
|
my $iThreadIdx = $args[1]; # Defines the index of this thread
|
||||||
my $bPathCreate = $args[2]; # Should paths be created automatically?
|
my $bChecksum = $args[2]; # Should checksums be generated on files after they have been backed up?
|
||||||
my $lSizeTotal = $args[3]; # Total size of the files to be copied by this thread
|
my $bPathCreate = $args[3]; # Should paths be created automatically?
|
||||||
|
my $lSizeTotal = $args[4]; # Total size of the files to be copied by this thread
|
||||||
|
my $oBackupManifest = $args[5]; # Backup manifest object (only used when single-threaded)
|
||||||
|
|
||||||
my $lSize = 0; # Size of files currently copied by this thread
|
my $lSize = 0; # Size of files currently copied by this thread
|
||||||
my $strLog; # Store the log message
|
my $strLog; # Store the log message
|
||||||
my $oFileThread = $oFile->clone($iThreadIdx); # Thread local file object
|
my $oFileThread; # Thread local file object
|
||||||
|
|
||||||
|
# If multi-threaded, then clone the file object
|
||||||
|
if ($bMulti)
|
||||||
|
{
|
||||||
|
$oFileThread = $oFile->clone($iThreadIdx);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
$oFileThread = $oFile;
|
||||||
|
}
|
||||||
|
|
||||||
# When a KILL signal is received, immediately abort
|
# When a KILL signal is received, immediately abort
|
||||||
$SIG{'KILL'} = sub {threads->exit();};
|
$SIG{'KILL'} = sub {threads->exit();};
|
||||||
@ -1158,8 +1178,18 @@ sub backup_file_thread
|
|||||||
# If file is missing assume the database removed it (else corruption and nothing we can do!)
|
# If file is missing assume the database removed it (else corruption and nothing we can do!)
|
||||||
&log(INFO, "thread ${iThreadIdx} skipped file removed by database: " . $oFileCopyMap{$strFile}{db_file});
|
&log(INFO, "thread ${iThreadIdx} skipped file removed by database: " . $oFileCopyMap{$strFile}{db_file});
|
||||||
|
|
||||||
# Write a message into the master queue to have the file removed from the manifest
|
# Remove file from the manifest
|
||||||
$oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}");
|
if ($bMulti)
|
||||||
|
{
|
||||||
|
# Write a message into the master queue to have the file removed from the manifest
|
||||||
|
$oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|".
|
||||||
|
"$oFileCopyMap{$strFile}{file}");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
# remove it directly
|
||||||
|
$oBackupManifest->remove($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file});
|
||||||
|
}
|
||||||
|
|
||||||
# Move on to the next file
|
# Move on to the next file
|
||||||
next;
|
next;
|
||||||
@ -1173,8 +1203,19 @@ sub backup_file_thread
|
|||||||
my $strChecksum = $oFileThread->hash(PATH_BACKUP_TMP,
|
my $strChecksum = $oFileThread->hash(PATH_BACKUP_TMP,
|
||||||
$oFileCopyMap{$strFile}{backup_file} . ($bCompress ? '.gz' : ''), $bCompress);
|
$oFileCopyMap{$strFile}{backup_file} . ($bCompress ? '.gz' : ''), $bCompress);
|
||||||
|
|
||||||
# Write the checksum message into the master queue
|
# Store checksum in the manifest
|
||||||
$oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}|${strChecksum}");
|
if ($bMulti)
|
||||||
|
{
|
||||||
|
# Write the checksum message into the master queue
|
||||||
|
$oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|" .
|
||||||
|
"$oFileCopyMap{$strFile}{file}|${strChecksum}");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
# Write it directly
|
||||||
|
$oBackupManifest->set($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file},
|
||||||
|
MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
|
||||||
|
}
|
||||||
|
|
||||||
# Output information about the file to be checksummed
|
# Output information about the file to be checksummed
|
||||||
if (!defined($strLog))
|
if (!defined($strLog))
|
||||||
|
@ -587,17 +587,28 @@ sub restore
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Create threads to process the thread queues
|
# If multi-threaded then create threads to copy files
|
||||||
my $oThreadGroup = thread_group_create();
|
if ($self->{iThreadTotal} > 1)
|
||||||
|
|
||||||
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
|
|
||||||
{
|
{
|
||||||
&log(DEBUG, "starting restore thread ${iThreadIdx}");
|
# Create threads to process the thread queues
|
||||||
thread_group_add($oThreadGroup, threads->create(\&restore_thread, $self, $iThreadIdx, \@oyRestoreQueue, $oManifest));
|
my $oThreadGroup = thread_group_create();
|
||||||
}
|
|
||||||
|
|
||||||
# Complete thread queues
|
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
|
||||||
thread_group_complete($oThreadGroup);
|
{
|
||||||
|
&log(DEBUG, "starting restore thread ${iThreadIdx}");
|
||||||
|
thread_group_add($oThreadGroup, threads->create(\&restore_thread, true, $self,
|
||||||
|
$iThreadIdx, \@oyRestoreQueue, $oManifest));
|
||||||
|
}
|
||||||
|
|
||||||
|
# Complete thread queues
|
||||||
|
thread_group_complete($oThreadGroup);
|
||||||
|
}
|
||||||
|
# Else copy in the main process
|
||||||
|
else
|
||||||
|
{
|
||||||
|
&log(DEBUG, "starting restore in main process");
|
||||||
|
$self->restore_thread(false, 0, \@oyRestoreQueue, $oManifest);
|
||||||
|
}
|
||||||
|
|
||||||
# Create recovery.conf file
|
# Create recovery.conf file
|
||||||
$self->recovery();
|
$self->recovery();
|
||||||
@ -611,12 +622,24 @@ sub restore
|
|||||||
sub restore_thread
|
sub restore_thread
|
||||||
{
|
{
|
||||||
my $self = shift; # Class hash
|
my $self = shift; # Class hash
|
||||||
|
my $bMulti = shift; # Is this thread one of many?
|
||||||
my $iThreadIdx = shift; # Defines the index of this thread
|
my $iThreadIdx = shift; # Defines the index of this thread
|
||||||
my $oyRestoreQueueRef = shift; # Restore queues
|
my $oyRestoreQueueRef = shift; # Restore queues
|
||||||
my $oManifest = shift; # Backup manifest
|
my $oManifest = shift; # Backup manifest
|
||||||
|
|
||||||
my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread
|
my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread
|
||||||
my $oFileThread = $self->{oFile}->clone($iThreadIdx); # Thread local file object
|
my $oFileThread = $self->{oFile}; # Thread local file object
|
||||||
|
|
||||||
|
# If multi-threaded, then clone the file object
|
||||||
|
if ($bMulti)
|
||||||
|
{
|
||||||
|
$oFileThread = $self->{oFile}->clone($iThreadIdx);
|
||||||
|
}
|
||||||
|
# Else use the master file object
|
||||||
|
else
|
||||||
|
{
|
||||||
|
$oFileThread = $self->{oFile};
|
||||||
|
}
|
||||||
|
|
||||||
# Initialize the starting and current queue index based in the total number of threads in relation to this thread
|
# Initialize the starting and current queue index based in the total number of threads in relation to this thread
|
||||||
my $iQueueStartIdx = int((@{$oyRestoreQueueRef} / $self->{iThreadTotal}) * $iThreadIdx);
|
my $iQueueStartIdx = int((@{$oyRestoreQueueRef} / $self->{iThreadTotal}) * $iThreadIdx);
|
||||||
|
Reference in New Issue
Block a user