diff --git a/pg_backrest_backup.pm b/pg_backrest_backup.pm index b0e64f379..8e4bf6a06 100644 --- a/pg_backrest_backup.pm +++ b/pg_backrest_backup.pm @@ -42,8 +42,7 @@ my $bArchiveRequired; # Thread variables my @oThread; my @oThreadQueue; -my @oThreadFile; -my $oMasterQueue; +my @oMasterQueue; my %oFileCopyMap; #################################################################################################################################### @@ -59,11 +58,6 @@ sub backup_init my $iThreadMaxParam = shift; my $bArchiveRequiredParam = shift; -# for (my $iFileIdx = 0; $iFileIdx < scalar @oFile; $iFileIdx++) -# { -# undef($oFile[$iFileIdx]); -# } - $oDb = $oDbParam; $oFile = $oFileParam; $strType = $strTypeParam; @@ -71,12 +65,12 @@ sub backup_init $bNoChecksum = $bNoChecksumParam; $iThreadMax = $iThreadMaxParam; $bArchiveRequired = $bArchiveRequiredParam; - + if (!defined($iThreadMax)) { $iThreadMax = 1; } - + if ($iThreadMax < 1 || $iThreadMax > 32) { confess &log(ERROR, "thread_max must be between 1 and 32"); @@ -109,15 +103,9 @@ sub thread_init for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++) { $oThreadQueue[$iThreadIdx] = Thread::Queue->new(); - -# if (!defined($oFile[$iThreadIdx + 1])) -# { -# $oFileThread[$iThreadIdx] = $oFile->clone($iThreadIdx + 1); -# } + $oMasterQueue[$iThreadIdx] = Thread::Queue->new(); } - $oMasterQueue = Thread::Queue->new(); - return $iThreadActualTotal; } @@ -1037,47 +1025,50 @@ sub backup_file # Read the messages that we passed back from the threads. These should be two types: # 1) remove - files that were skipped because they were removed from the database during backup # 2) checksum - file checksums calculated by the threads - while (my $strMessage = $oMasterQueue->dequeue_nb()) + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) { - &log (DEBUG, "message received in master queue: ${strMessage}"); - - # Split the message. Currently using | as the split character. Not ideal, but it will do for now. - my @strSplit = split(/\|/, $strMessage); - - my $strCommand = $strSplit[0]; # Command to execute on a file - my $strFileSection = $strSplit[1]; # File section where the file is located - my $strFile = $strSplit[2]; # The file to act on - - # These three parts are required - if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile)) + while (my $strMessage = $oMasterQueue[$iThreadIdx]->dequeue_nb()) { - confess &log(ASSERT, "thread messages must have strCommand, strFileSection and strFile defined"); - } + &log (DEBUG, "message received in master queue: ${strMessage}"); - &log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}"); + # Split the message. Currently using | as the split character. Not ideal, but it will do for now. + my @strSplit = split(/\|/, $strMessage); - # If command is "remove" then mark the skipped file in the manifest - if ($strCommand eq "remove") - { - delete ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}; + my $strCommand = $strSplit[0]; # Command to execute on a file + my $strFileSection = $strSplit[1]; # File section where the file is located + my $strFile = $strSplit[2]; # The file to act on - &log (INFO, "marked skipped ${strFileSection}:${strFile} from the manifest"); - } - # If command is "checksum" then record the checksum in the manifest - elsif ($strCommand eq "checksum") - { - my $strChecksum = $strSplit[3]; # File checksum calculated by the thread - - # Checksum must be defined - if (!defined($strChecksum)) + # These three parts are required + if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile)) { - confess &log(ASSERT, "thread checksum messages must have strChecksum defined"); + confess &log(ASSERT, "thread messages must have strCommand, strFileSection and strFile defined"); } - ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}{checksum} = $strChecksum; + &log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}"); - # Log the checksum - &log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}"); + # If command is "remove" then mark the skipped file in the manifest + if ($strCommand eq "remove") + { + delete ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}; + + &log (INFO, "marked skipped ${strFileSection}:${strFile} from the manifest"); + } + # If command is "checksum" then record the checksum in the manifest + elsif ($strCommand eq "checksum") + { + my $strChecksum = $strSplit[3]; # File checksum calculated by the thread + + # Checksum must be defined + if (!defined($strChecksum)) + { + confess &log(ASSERT, "thread checksum messages must have strChecksum defined"); + } + + ${$oBackupManifestRef}{"${strFileSection}"}{"$strFile"}{checksum} = $strChecksum; + + # Log the checksum + &log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum}"); + } } } } @@ -1115,6 +1106,8 @@ sub backup_file_thread undef, $oFileCopyMap{$strFile}{modification_time}, undef, $bPathCreate, false)) { + &log(DEBUG, "thread ${iThreadIdx} unable to copy file: " . $oFileCopyMap{$strFile}{db_file}); + # If the copy fails then see if the file still exists on the database if (!$oFileThread->file_exists(PATH_DB_ABSOLUTE, $oFileCopyMap{$strFile}{db_file})) { @@ -1127,7 +1120,7 @@ sub backup_file_thread } # Write a message into the master queue to have the file removed from the manifest - $oMasterQueue->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}"); + $oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}"); # Move on to the next file next; @@ -1140,7 +1133,7 @@ sub backup_file_thread my $strChecksum = $oFileThread->file_hash_get(PATH_BACKUP_TMP, $oFileCopyMap{$strFile}{backup_file}); # Write the checksum message into the master queue - $oMasterQueue->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}|${strChecksum}"); + $oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|$oFileCopyMap{$strFile}{file}|${strChecksum}"); &log(INFO, $strLog . " checksum ${strChecksum}"); }