mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-01-30 05:39:12 +02:00
Using separate master queues
This commit is contained in:
parent
800cd89831
commit
857e177429
@ -42,8 +42,7 @@ my $bArchiveRequired;
|
||||
# Thread variables
|
||||
my @oThread;
|
||||
my @oThreadQueue;
|
||||
my @oThreadFile;
|
||||
my $oMasterQueue;
|
||||
my @oMasterQueue;
|
||||
my %oFileCopyMap;
|
||||
|
||||
####################################################################################################################################
|
||||
@ -59,11 +58,6 @@ sub backup_init
|
||||
my $iThreadMaxParam = shift;
|
||||
my $bArchiveRequiredParam = shift;
|
||||
|
||||
# for (my $iFileIdx = 0; $iFileIdx < scalar @oFile; $iFileIdx++)
|
||||
# {
|
||||
# undef($oFile[$iFileIdx]);
|
||||
# }
|
||||
|
||||
$oDb = $oDbParam;
|
||||
$oFile = $oFileParam;
|
||||
$strType = $strTypeParam;
|
||||
@ -71,12 +65,12 @@ sub backup_init
|
||||
$bNoChecksum = $bNoChecksumParam;
|
||||
$iThreadMax = $iThreadMaxParam;
|
||||
$bArchiveRequired = $bArchiveRequiredParam;
|
||||
|
||||
|
||||
if (!defined($iThreadMax))
|
||||
{
|
||||
$iThreadMax = 1;
|
||||
}
|
||||
|
||||
|
||||
if ($iThreadMax < 1 || $iThreadMax > 32)
|
||||
{
|
||||
confess &log(ERROR, "thread_max must be between 1 and 32");
|
||||
@ -109,15 +103,9 @@ sub thread_init
|
||||
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++)
|
||||
{
|
||||
$oThreadQueue[$iThreadIdx] = Thread::Queue->new();
|
||||
|
||||
# if (!defined($oFile[$iThreadIdx + 1]))
|
||||
# {
|
||||
# $oFileThread[$iThreadIdx] = $oFile->clone($iThreadIdx + 1);
|
||||
# }
|
||||
$oMasterQueue[$iThreadIdx] = Thread::Queue->new();
|
||||
}
|
||||
|
||||
$oMasterQueue = Thread::Queue->new();
|
||||
|
||||
return $iThreadActualTotal;
|
||||
}
|
||||
|
||||
@ -1037,47 +1025,50 @@ sub backup_file
|
||||
# Read the messages that we passed back from the threads. These should be two types:
|
||||
# 1) remove - files that were skipped because they were removed from the database during backup
|
||||
# 2) checksum - file checksums calculated by the threads
|
||||
while (my $strMessage = $oMasterQueue->dequeue_nb())
|
||||
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
||||
{
|
||||
&log (DEBUG, "message received in master queue: ${strMessage}");
|
||||
|
||||
# Split the message. Currently using | as the split character. Not ideal, but it will do for now.
|
||||
my @strSplit = split(/\|/, $strMessage);
|
||||
|
||||
my $strCommand = $strSplit[0]; # Command to execute on a file
|
||||
my $strFileSection = $strSplit[1]; # File section where the file is located
|
||||
my $strFile = $strSplit[2]; # The file to act on
|
||||
|
||||
# These three parts are required
|
||||
if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile))
|
||||
while (my $strMessage = $oMasterQueue[$iThreadIdx]->dequeue_nb())
|
||||
{
|
||||
confess &log(ASSERT, "thread messages must have strCommand, strFileSection and strFile defined");
|
||||
}
|
||||
&log (DEBUG, "message received in master queue: ${strMessage}");
|
||||
|
||||
&log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}");
|
||||
# Split the message. Currently using | as the split character. Not ideal, but it will do for now.
|
||||
my @strSplit = split(/\|/, $strMessage);
|
||||
|
||||
# If command is "remove" then mark the skipped file in the manifest
|
||||
if ($strCommand eq "remove")
|
||||
{
|
||||
delete ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"};
|
||||
my $strCommand = $strSplit[0]; # Command to execute on a file
|
||||
my $strFileSection = $strSplit[1]; # File section where the file is located
|
||||
my $strFile = $strSplit[2]; # The file to act on
|
||||
|
||||
&log (INFO, "marked skipped ${strFileSection}:${strFile} from the manifest");
|
||||
}
|
||||
# If command is "checksum" then record the checksum in the manifest
|
||||
elsif ($strCommand eq "checksum")
|
||||
{
|
||||
my $strChecksum = $strSplit[3]; # File checksum calculated by the thread
|
||||
|
||||
# Checksum must be defined
|
||||
if (!defined($strChecksum))
|
||||
# These three parts are required
|
||||
if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile))
|
||||
{
|
||||
confess &log(ASSERT, "thread checksum messages must have strChecksum defined");
|
||||
confess &log(ASSERT, "thread messages must have strCommand, strFileSection and strFile defined");
|
||||
}
|
||||
|
||||
${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}{checksum} = $strChecksum;
|
||||
&log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}");
|
||||
|
||||
# Log the checksum
|
||||
&log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}");
|
||||
# If command is "remove" then mark the skipped file in the manifest
|
||||
if ($strCommand eq "remove")
|
||||
{
|
||||
delete ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"};
|
||||
|
||||
&log (INFO, "marked skipped ${strFileSection}:${strFile} from the manifest");
|
||||
}
|
||||
# If command is "checksum" then record the checksum in the manifest
|
||||
elsif ($strCommand eq "checksum")
|
||||
{
|
||||
my $strChecksum = $strSplit[3]; # File checksum calculated by the thread
|
||||
|
||||
# Checksum must be defined
|
||||
if (!defined($strChecksum))
|
||||
{
|
||||
confess &log(ASSERT, "thread checksum messages must have strChecksum defined");
|
||||
}
|
||||
|
||||
${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}{checksum} = $strChecksum;
|
||||
|
||||
# Log the checksum
|
||||
&log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1115,6 +1106,8 @@ sub backup_file_thread
|
||||
undef, $oFileCopyMap{$strFile}{modification_time},
|
||||
undef, $bPathCreate, false))
|
||||
{
|
||||
&log(DEBUG, "thread ${iThreadIdx} unable to copy file: " . $oFileCopyMap{$strFile}{db_file});
|
||||
|
||||
# If the copy fails then see if the file still exists on the database
|
||||
if (!$oFileThread->file_exists(PATH_DB_ABSOLUTE, $oFileCopyMap{$strFile}{db_file}))
|
||||
{
|
||||
@ -1127,7 +1120,7 @@ sub backup_file_thread
|
||||
}
|
||||
|
||||
# Write a message into the master queue to have the file removed from the manifest
|
||||
$oMasterQueue->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}");
|
||||
$oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}");
|
||||
|
||||
# Move on to the next file
|
||||
next;
|
||||
@ -1140,7 +1133,7 @@ sub backup_file_thread
|
||||
my $strChecksum = $oFileThread->file_hash_get(PATH_BACKUP_TMP, $oFileCopyMap{$strFile}{backup_file});
|
||||
|
||||
# Write the checksum message into the master queue
|
||||
$oMasterQueue->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}|${strChecksum}");
|
||||
$oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}|${strChecksum}");
|
||||
|
||||
&log(INFO, $strLog . " checksum ${strChecksum}");
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user