diff --git a/README.md b/README.md index 9d993bd7b..9fba2baaf 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,8 @@ cpanm Net::OpenSSH cpanm IPC::System::Simple cpanm Digest::SHA cpanm Compress::ZLib +cpanm threads (update this package) +cpanm Thread::Queue (update this package) ``` * Install PgBackRest @@ -696,12 +698,14 @@ example: db-path=/data/db ## Release Notes -### v0.60: TBD +### v0.60: **************TBD*************** * Pushing duplicate WAL now generates an error. This worked before only if checksums were disabled. * Database System IDs are used to make sure that all WAL in an archive matches up. This should help prevent misconfigurations that send WAL from multiple clusters to the same archive. +* Regression tests working back to PostgreSQL 8.3. + ### v0.50: restore and much more * Added restore functionality. diff --git a/bin/pg_backrest.pl b/bin/pg_backrest.pl index a5fae2630..9e20f9610 100755 --- a/bin/pg_backrest.pl +++ b/bin/pg_backrest.pl @@ -22,6 +22,7 @@ use BackRest::File; use BackRest::Archive; use BackRest::Backup; use BackRest::Restore; +use BackRest::ThreadGroup; #################################################################################################################################### # Usage @@ -75,12 +76,6 @@ pg_backrest.pl [options] [operation] =cut -#################################################################################################################################### -# Global variables -#################################################################################################################################### -my $oRemote; # Remote protocol object -my $oLocal; # Local protocol object - #################################################################################################################################### # SAFE_EXIT - terminate all SSH sessions when the script is terminated #################################################################################################################################### @@ -88,13 +83,14 @@ sub safe_exit { my $iExitCode = shift; + my $iTotal = threadGroupDestroy(); + remoteDestroy(); + if (defined($iExitCode)) { exit $iExitCode; } - my $iTotal = backup_thread_kill(); - &log(ERROR, "process terminated on signal or exception, ${iTotal} threads stopped"); } @@ -126,6 +122,16 @@ if (operationTest(OP_ARCHIVE_PUSH) || operationTest(OP_ARCHIVE_GET)) safe_exit(new BackRest::Archive()->process()); } +#################################################################################################################################### +# Open the log file +#################################################################################################################################### +log_file_set(optionGet(OPTION_REPO_PATH) . '/log/' . optionGet(OPTION_STANZA) . '-' . lc(operationGet())); + +#################################################################################################################################### +# Create the thread group that will be used for parallel processing +#################################################################################################################################### +threadGroupCreate(); + #################################################################################################################################### # Initialize the default file object #################################################################################################################################### @@ -147,9 +153,6 @@ if (operationTest(OP_RESTORE)) confess &log(ASSERT, 'restore operation must be performed locally on the db server'); } - # Open the log file - log_file_set(optionGet(OPTION_REPO_PATH) . '/log/' . optionGet(OPTION_STANZA) . '-restore'); - # Set the lock path my $strLockPath = optionGet(OPTION_REPO_PATH) . '/lock/' . optionGet(OPTION_STANZA) . '-' . operationGet() . '.lock'; @@ -181,9 +184,6 @@ if (operationTest(OP_RESTORE)) #################################################################################################################################### # GET MORE CONFIG INFO #################################################################################################################################### -# Open the log file -log_file_set(optionGet(OPTION_REPO_PATH) . '/log/' . optionGet(OPTION_STANZA)); - # Make sure backup and expire operations happen on the backup side if (optionRemoteTypeTest(BACKUP)) { diff --git a/doc/doc.xml b/doc/doc.xml index 6a0c33501..c8e5fbace 100644 --- a/doc/doc.xml +++ b/doc/doc.xml @@ -61,6 +61,8 @@ cpanm IPC::System::Simple cpanm Digest::SHA cpanm Compress::ZLib + cpanm threads (update this package) + cpanm Thread::Queue (update this package) * Install PgBackRest diff --git a/lib/BackRest/Backup.pm b/lib/BackRest/Backup.pm index cb05ef355..0d29b7a79 100644 --- a/lib/BackRest/Backup.pm +++ b/lib/BackRest/Backup.pm @@ -21,30 +21,24 @@ use BackRest::Config; use BackRest::Manifest; use BackRest::File; use BackRest::Db; +use BackRest::ThreadGroup; use BackRest::Archive; +use BackRest::BackupFile; use Exporter qw(import); -our @EXPORT = qw(backup_init backup_cleanup backup_thread_kill backup backup_expire archive_list_get); +our @EXPORT = qw(backup_init backup_cleanup backup backup_expire archive_list_get); my $oDb; my $oFile; my $strType; # Type of backup: full, differential (diff), incremental (incr) my $bCompress; my $bHardLink; -my $iThreadMax; -my $iThreadLocalMax; -my $iSmallFileThreshold = 65536; my $bNoStartStop; my $bForce; +my $iThreadMax; my $iThreadTimeout; -# Thread variables -my @oThread; -my @oThreadQueue; -my @oMasterQueue; -my %oFileCopyMap; - #################################################################################################################################### # BACKUP_INIT #################################################################################################################################### @@ -69,16 +63,6 @@ sub backup_init $iThreadTimeout = $iThreadTimeoutParam; $bNoStartStop = $bNoStartStopParam; $bForce = $bForceParam; - - if (!defined($iThreadMax)) - { - $iThreadMax = 1; - } - - if ($iThreadMax < 1 || $iThreadMax > 32) - { - confess &log(ERROR, 'thread_max must be between 1 and 32'); - } } #################################################################################################################################### @@ -89,142 +73,6 @@ sub backup_cleanup undef($oFile); } -#################################################################################################################################### -# THREAD_INIT -#################################################################################################################################### -sub thread_init -{ - my $iThreadRequestTotal = shift; # Number of threads that were requested - - my $iThreadActualTotal; # Number of actual threads assigned - - if (!defined($iThreadRequestTotal)) - { - $iThreadActualTotal = $iThreadMax; - } - else - { - $iThreadActualTotal = $iThreadRequestTotal < $iThreadMax ? $iThreadRequestTotal : $iThreadMax; - - if ($iThreadActualTotal < 1) - { - $iThreadActualTotal = 1; - } - } - - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++) - { - $oThreadQueue[$iThreadIdx] = Thread::Queue->new(); - $oMasterQueue[$iThreadIdx] = Thread::Queue->new(); - } - - return $iThreadActualTotal; -} - -#################################################################################################################################### -# BACKUP_THREAD_KILL -#################################################################################################################################### -sub backup_thread_kill -{ - my $iTotal = 0; - - for (my $iThreadIdx = 0; $iThreadIdx < scalar @oThread; $iThreadIdx++) - { - if (defined($oThread[$iThreadIdx])) - { - if ($oThread[$iThreadIdx]->is_running()) - { - $oThread[$iThreadIdx]->kill('KILL')->join(); - } - elsif ($oThread[$iThreadIdx]->is_joinable()) - { - $oThread[$iThreadIdx]->join(); - } - - undef($oThread[$iThreadIdx]); - $iTotal++; - } - } - - return($iTotal); -} - -#################################################################################################################################### -# BACKUP_THREAD_COMPLETE -#################################################################################################################################### -sub backup_thread_complete -{ - my $iTimeout = shift; - my $bConfessOnError = shift; - - if (!defined($bConfessOnError)) - { - $bConfessOnError = true; - } - -# if (!defined($iTimeout)) -# { -# &log(WARN, "no thread timeout was set"); -# } - - # Wait for all threads to complete and handle errors - my $iThreadComplete = 0; - my $lTimeBegin = time(); - - # Rejoin the threads - while ($iThreadComplete < $iThreadLocalMax) - { - # If a timeout has been defined, make sure we have not been running longer than that - if (defined($iTimeout)) - { - if (time() - $lTimeBegin >= $iTimeout) - { - confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting..."); - - #backup_thread_kill(); - - #confess &log(WARN, "all threads have exited, aborting..."); - } - } - - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) - { - if (defined($oThread[$iThreadIdx])) - { - if (defined($oThread[$iThreadIdx]->error())) - { - backup_thread_kill(); - - if ($bConfessOnError) - { - confess &log(ERROR, 'error in thread ' . (${iThreadIdx} + 1) . ': check log for details'); - } - else - { - return false; - } - } - - if ($oThread[$iThreadIdx]->is_joinable()) - { - &log(DEBUG, "thread ${iThreadIdx} exited"); - $oThread[$iThreadIdx]->join(); - &log(TRACE, "thread ${iThreadIdx} object undef"); - undef($oThread[$iThreadIdx]); - $iThreadComplete++; - } - } - } - - # Sleep before trying again - hsleep(.1); - } - - &log(DEBUG, 'all threads exited'); - - return true; -} - #################################################################################################################################### # BACKUP_REGEXP_GET - Generate a regexp depending on the backups that need to be found #################################################################################################################################### @@ -466,78 +314,54 @@ sub backup_file my $oBackupManifest = shift; # Manifest for the current backup # Variables used for parallel copy - my $lTablespaceIdx = 0; + my %oFileCopyMap; my $lFileTotal = 0; - my $lFileLargeSize = 0; - my $lFileLargeTotal = 0; - my $lFileSmallSize = 0; - my $lFileSmallTotal = 0; - - # Decide if all the paths will be created in advance - my $bPathCreate = $bHardLink || $strType eq BACKUP_TYPE_FULL; + my $lSizeTotal = 0; # Iterate through the path sections of the manifest to backup - foreach my $strSectionPath ($oBackupManifest->keys()) + foreach my $strPathKey ($oBackupManifest->keys(MANIFEST_SECTION_BACKUP_PATH)) { - # Skip non-path sections - if ($strSectionPath !~ /\:path$/ || $strSectionPath =~ /^backup\:path$/) - { - next; - } - # Determine the source and destination backup paths my $strBackupSourcePath; # Absolute path to the database base directory or tablespace to backup my $strBackupDestinationPath; # Relative path to the backup directory where the data will be stored my $strSectionFile; # Manifest section that contains the file data # Process the base database directory - if ($strSectionPath =~ /^base\:/) + if ($strPathKey =~ /^base$/) { - $lTablespaceIdx++; $strBackupSourcePath = $strDbClusterPath; $strBackupDestinationPath = 'base'; - $strSectionFile = 'base:file'; # Create the archive log directory $oFile->path_create(PATH_BACKUP_TMP, 'base/pg_xlog'); } # Process each tablespace - elsif ($strSectionPath =~ /^tablespace\:/) + elsif ($strPathKey =~ /^tablespace\:/) { - $lTablespaceIdx++; - my $strTablespaceName = (split(':', $strSectionPath))[1]; + my $strTablespaceName = (split(':', $strPathKey))[1]; $strBackupSourcePath = $oBackupManifest->get(MANIFEST_SECTION_BACKUP_TABLESPACE, $strTablespaceName, MANIFEST_SUBKEY_PATH); $strBackupDestinationPath = "tablespace/${strTablespaceName}"; $strSectionFile = "tablespace:${strTablespaceName}:file"; # Create the tablespace directory and link - if ($bPathCreate) + if ($bHardLink || $strType eq BACKUP_TYPE_FULL) { - $oFile->path_create(PATH_BACKUP_TMP, $strBackupDestinationPath); - - $oFile->link_create(PATH_BACKUP_TMP, ${strBackupDestinationPath}, - PATH_BACKUP_TMP, - 'base/pg_tblspc/' . $oBackupManifest->get(MANIFEST_SECTION_BACKUP_TABLESPACE, $strTablespaceName, - MANIFEST_SUBKEY_LINK), - false, true); + $oFile->link_create(PATH_BACKUP_TMP, $strBackupDestinationPath, + PATH_BACKUP_TMP, + 'base/pg_tblspc/' . $oBackupManifest->get(MANIFEST_SECTION_BACKUP_TABLESPACE, $strTablespaceName, + MANIFEST_SUBKEY_LINK), + false, true, true); } } else { - confess &log(ASSERT, "cannot find type for section ${strSectionPath}"); + confess &log(ASSERT, "cannot find type for path ${strPathKey}"); } - # Create all the sub paths if this is a full backup or hardlinks are requested - if ($bPathCreate) - { - foreach my $strPath ($oBackupManifest->keys($strSectionPath)) - { - $oFile->path_create(PATH_BACKUP_TMP, "${strBackupDestinationPath}/${strPath}", undef, true); - } - } + # Possible for the file section to exist with no files (i.e. empty tablespace) + $strSectionFile = "$strPathKey:file"; - # Possible for the path section to exist with no files (i.e. empty tablespace) if (!$oBackupManifest->test($strSectionFile)) { next; @@ -573,7 +397,7 @@ sub backup_file &log(DEBUG, "hard-linking ${strBackupSourceFile} from ${strReference}"); $oFile->link_create(PATH_BACKUP_CLUSTER, "${strReference}/${strBackupDestinationPath}/${strFile}", - PATH_BACKUP_TMP, "${strBackupDestinationPath}/${strFile}", true, false, !$bPathCreate); + PATH_BACKUP_TMP, "${strBackupDestinationPath}/${strFile}", true, false, true); } } # Else copy/compress the file and generate a checksum @@ -589,291 +413,164 @@ sub backup_file # Setup variables needed for threaded copy $lFileTotal++; - $lFileLargeSize += $lFileSize > $iSmallFileThreshold ? $lFileSize : 0; - $lFileLargeTotal += $lFileSize > $iSmallFileThreshold ? 1 : 0; - $lFileSmallSize += $lFileSize <= $iSmallFileThreshold ? $lFileSize : 0; - $lFileSmallTotal += $lFileSize <= $iSmallFileThreshold ? 1 : 0; + $lSizeTotal += $lFileSize; - # Load the hash used by threaded copy - my $strKey = sprintf('ts%012x-fs%012x-fn%012x', $lTablespaceIdx, - $lFileSize, $lFileTotal); - - $oFileCopyMap{"${strKey}"}{db_file} = $strBackupSourceFile; - $oFileCopyMap{"${strKey}"}{file_section} = $strSectionFile; - $oFileCopyMap{"${strKey}"}{file} = ${strFile}; - $oFileCopyMap{"${strKey}"}{backup_file} = "${strBackupDestinationPath}/${strFile}"; - $oFileCopyMap{"${strKey}"}{size} = $lFileSize; - $oFileCopyMap{"${strKey}"}{modification_time} = - $oBackupManifest->get($strSectionFile, $strFile, MANIFEST_SUBKEY_MODIFICATION_TIME); - $oFileCopyMap{"${strKey}"}{checksum_only} = $bProcessChecksumOnly; - $oFileCopyMap{"${strKey}"}{checksum} = + $oFileCopyMap{$strPathKey}{$strFile}{db_file} = $strBackupSourceFile; + $oFileCopyMap{$strPathKey}{$strFile}{file_section} = $strSectionFile; + $oFileCopyMap{$strPathKey}{$strFile}{file} = ${strFile}; + $oFileCopyMap{$strPathKey}{$strFile}{backup_file} = "${strBackupDestinationPath}/${strFile}"; + $oFileCopyMap{$strPathKey}{$strFile}{size} = $lFileSize; + $oFileCopyMap{$strPathKey}{$strFile}{checksum_only} = $bProcessChecksumOnly; + $oFileCopyMap{$strPathKey}{$strFile}{checksum} = $oBackupManifest->get($strSectionFile, $strFile, MANIFEST_SUBKEY_CHECKSUM, false); } } } - # Build the thread queues - $iThreadLocalMax = thread_init($iThreadMax); - &log(DEBUG, "actual threads ${iThreadLocalMax}/${iThreadMax}"); - - # Initialize the thread size array - my @oyThreadData; - - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + # If there are no files to backup then we'll exit with a warning unless in test mode. The other way this could happen is if + # the database is down and backup is called with --no-start-stop twice in a row. + if ($lFileTotal == 0) { - $oyThreadData[$iThreadIdx]{size} = 0; - $oyThreadData[$iThreadIdx]{total} = 0; - $oyThreadData[$iThreadIdx]{large_size} = 0; - $oyThreadData[$iThreadIdx]{large_total} = 0; - $oyThreadData[$iThreadIdx]{small_size} = 0; - $oyThreadData[$iThreadIdx]{small_total} = 0; + if (!optionGet(OPTION_TEST)) + { + confess &log(WARN, "no files have changed since the last backup - this seems unlikely"); + } + + return; } - # Assign files to each thread queue - my $iThreadFileSmallIdx = 0; - my $iThreadFileSmallTotalMax = int($lFileSmallTotal / $iThreadLocalMax); + # Create backup and result queues + my $oResultQueue = Thread::Queue->new(); + my @oyBackupQueue; - my $iThreadFileLargeIdx = 0; - my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadLocalMax; + # Variables used for local copy + my $lSizeCurrent = 0; # Running total of bytes copied + my $bCopied; # Was the file copied? + my $lCopySize; # Size reported by copy + my $strCopyChecksum; # Checksum reported by copy - &log(INFO, "file total ${lFileTotal}"); - &log(DEBUG, "file small total ${lFileSmallTotal}, small size: " . file_size_format($lFileSmallSize) . - ', small thread avg total ' . file_size_format(int($iThreadFileSmallTotalMax))); - &log(DEBUG, "file large total ${lFileLargeTotal}, large size: " . file_size_format($lFileLargeSize) . - ', large thread avg size ' . file_size_format(int($fThreadFileLargeSizeMax))); - - foreach my $strFile (sort (keys %oFileCopyMap)) + # Iterate all backup files + foreach my $strPathKey (sort (keys %oFileCopyMap)) { - my $lFileSize = $oFileCopyMap{"${strFile}"}{size}; - - if ($lFileSize > $iSmallFileThreshold) + if ($iThreadMax > 1) { - $oThreadQueue[$iThreadFileLargeIdx]->enqueue($strFile); - - $oyThreadData[$iThreadFileLargeIdx]{large_size} += $lFileSize; - $oyThreadData[$iThreadFileLargeIdx]{large_total}++; - $oyThreadData[$iThreadFileLargeIdx]{size} += $lFileSize; - - if ($oyThreadData[$iThreadFileLargeIdx]{large_size} >= $fThreadFileLargeSizeMax && - $iThreadFileLargeIdx < $iThreadLocalMax - 1) - { - $iThreadFileLargeIdx++; - } + $oyBackupQueue[@oyBackupQueue] = Thread::Queue->new(); } - else + + foreach my $strFile (sort (keys $oFileCopyMap{$strPathKey})) { - $oThreadQueue[$iThreadFileSmallIdx]->enqueue($strFile); + my $oFileCopy = $oFileCopyMap{$strPathKey}{$strFile}; - $oyThreadData[$iThreadFileSmallIdx]{small_size} += $lFileSize; - $oyThreadData[$iThreadFileSmallIdx]{small_total}++; - $oyThreadData[$iThreadFileSmallIdx]{size} += $lFileSize; - - if ($oyThreadData[$iThreadFileSmallIdx]{small_total} >= $iThreadFileSmallTotalMax && - $iThreadFileSmallIdx < $iThreadLocalMax - 1) + if ($iThreadMax > 1) { - $iThreadFileSmallIdx++; + $oyBackupQueue[@oyBackupQueue - 1]->enqueue($oFileCopy); + } + else + { + # Backup the file + ($bCopied, $lSizeCurrent, $lCopySize, $strCopyChecksum) = + backupFile($oFile, $$oFileCopy{db_file}, $$oFileCopy{backup_file}, $bCompress, + $$oFileCopy{checksum}, $$oFileCopy{checksum_only}, + $$oFileCopy{size}, $lSizeTotal, $lSizeCurrent); + + # If copy was successful store the checksum and size + if ($bCopied) + { + $oBackupManifest->set($$oFileCopy{file_section}, $$oFileCopy{file}, + MANIFEST_SUBKEY_SIZE, $lCopySize + 0); + + if ($lCopySize > 0) + { + $oBackupManifest->set($$oFileCopy{file_section}, $$oFileCopy{file}, + MANIFEST_SUBKEY_CHECKSUM, $strCopyChecksum); + } + } + # Else the file was removed during backup so remove from manifest + else + { + $oBackupManifest->remove($$oFileCopy{file_section}, $$oFileCopy{file}); + } } } } - if ($iThreadLocalMax > 1) + # If multi-threaded then create threads to copy files + if ($iThreadMax > 1) { - # End each thread queue and start the backup_file threads - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++) { - # Output info about how much work each thread is going to do - &log(DEBUG, "thread ${iThreadIdx} large total $oyThreadData[$iThreadIdx]{large_total}, " . - "size $oyThreadData[$iThreadIdx]{large_size}"); - &log(DEBUG, "thread ${iThreadIdx} small total $oyThreadData[$iThreadIdx]{small_total}, " . - "size $oyThreadData[$iThreadIdx]{small_size}"); + my %oParam; - # Start the thread - $oThread[$iThreadIdx] = threads->create(\&backup_file_thread, true, $iThreadIdx, !$bPathCreate, - $oyThreadData[$iThreadIdx]{size}, $oBackupManifest); + $oParam{compress} = $bCompress; + $oParam{size_total} = $lSizeTotal; + $oParam{queue} = \@oyBackupQueue; + $oParam{result_queue} = $oResultQueue; + + threadGroupRun($iThreadIdx, 'backup', \%oParam); } - # Wait for the threads to complete - backup_thread_complete($iThreadTimeout); + # Complete thread queues + threadGroupComplete(); # Read the messages that we passed back from the threads. These should be two types: # 1) remove - files that were skipped because they were removed from the database during backup # 2) checksum - file checksums calculated by the threads - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + while (my $strMessage = $oResultQueue->dequeue_nb()) { - while (my $strMessage = $oMasterQueue[$iThreadIdx]->dequeue_nb()) + &log(TRACE, "message received in master queue: ${strMessage}"); + + # Split the message. Currently using | as the split character. Not ideal, but it will do for now. + my @strSplit = split(/\|/, $strMessage); + + my $strCommand = $strSplit[0]; # Command to execute on a file + my $strFileSection = $strSplit[1]; # File section where the file is located + my $strFile = $strSplit[2]; # The file to act on + + # These three parts are required + if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile)) { - &log (DEBUG, "message received in master queue: ${strMessage}"); + confess &log(ASSERT, 'thread messages must have strCommand, strFileSection and strFile defined'); + } - # Split the message. Currently using | as the split character. Not ideal, but it will do for now. - my @strSplit = split(/\|/, $strMessage); + &log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}"); - my $strCommand = $strSplit[0]; # Command to execute on a file - my $strFileSection = $strSplit[1]; # File section where the file is located - my $strFile = $strSplit[2]; # The file to act on + # If command is 'remove' then mark the skipped file in the manifest + if ($strCommand eq 'remove') + { + $oBackupManifest->remove($strFileSection, $strFile); - # These three parts are required - if (!defined($strCommand) || !defined($strFileSection) || !defined($strFile)) + &log (INFO, "removed file ${strFileSection}:${strFile} from the manifest (it was removed by db during backup)"); + } + # If command is 'checksum' then record the checksum in the manifest + elsif ($strCommand eq 'checksum') + { + my $strChecksum = $strSplit[3]; # File checksum calculated by the thread + my $lFileSize = $strSplit[4]; # File size calculated by the thread + + # Checksum must be defined + if (!defined($strChecksum)) { - confess &log(ASSERT, 'thread messages must have strCommand, strFileSection and strFile defined'); + confess &log(ASSERT, 'thread checksum messages must have strChecksum defined'); } - &log (DEBUG, "command = ${strCommand}, file_section = ${strFileSection}, file = ${strFile}"); - - # If command is 'remove' then mark the skipped file in the manifest - if ($strCommand eq 'remove') + # Checksum must be defined + if (!defined($lFileSize)) { - $oBackupManifest->remove($strFileSection, $strFile); - - &log (INFO, "removed file ${strFileSection}:${strFile} from the manifest (it was removed by db during backup)"); + confess &log(ASSERT, 'thread checksum messages must have lFileSize defined'); } - # If command is 'checksum' then record the checksum in the manifest - elsif ($strCommand eq 'checksum') + + $oBackupManifest->set($strFileSection, $strFile, MANIFEST_SUBKEY_SIZE, $lFileSize + 0); + + if ($lFileSize > 0) { - my $strChecksum = $strSplit[3]; # File checksum calculated by the thread - my $lFileSize = $strSplit[4]; # File size calculated by the thread - - # Checksum must be defined - if (!defined($strChecksum)) - { - confess &log(ASSERT, 'thread checksum messages must have strChecksum defined'); - } - - # Checksum must be defined - if (!defined($lFileSize)) - { - confess &log(ASSERT, 'thread checksum messages must have lFileSize defined'); - } - $oBackupManifest->set($strFileSection, $strFile, MANIFEST_SUBKEY_CHECKSUM, $strChecksum); - $oBackupManifest->set($strFileSection, $strFile, MANIFEST_SUBKEY_SIZE, $lFileSize + 0); - - # Log the checksum - &log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum} (${lFileSize})"); - } - } - } - } - else - { - &log(DEBUG, "starting backup in main process"); - backup_file_thread(false, 0, !$bPathCreate, $oyThreadData[0]{size}, $oBackupManifest); - } -} - -sub backup_file_thread -{ - my $bMulti = shift; # Is this thread one of many? - my $iThreadIdx = shift; # Defines the index of this thread - my $bPathCreate = shift; # Should paths be created automatically? - my $lSizeTotal = shift; # Total size of the files to be copied by this thread - my $oBackupManifest = shift; # Backup manifest object (only used when single-threaded) - - my $lSize = 0; # Size of files currently copied by this thread - my $strLog; # Store the log message - my $strLogProgress; # Part of the log message that shows progress - my $oFileThread; # Thread local file object - my $bCopyResult; # Copy result - my $strCopyChecksum; # Copy checksum - my $lCopySize; # Copy Size - - # If multi-threaded, then clone the file object - if ($bMulti) - { - $oFileThread = $oFile->clone($iThreadIdx); - } - else - { - $oFileThread = $oFile; - } - - # When a KILL signal is received, immediately abort - $SIG{'KILL'} = sub {threads->exit();}; - - # Iterate through all the files in this thread's queue to be copied from the database to the backup - while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue_nb()) - { - # Add the size of the current file to keep track of percent complete - $lSize += $oFileCopyMap{$strFile}{size}; - - if (!$oFileCopyMap{$strFile}{checksum_only}) - { - # Output information about the file to be copied - $strLog = "thread ${iThreadIdx} backing up file"; - - # Copy the file from the database to the backup (will return false if the source file is missing) - ($bCopyResult, $strCopyChecksum, $lCopySize) = - $oFileThread->copy(PATH_DB_ABSOLUTE, $oFileCopyMap{$strFile}{db_file}, - PATH_BACKUP_TMP, $oFileCopyMap{$strFile}{backup_file} . - ($bCompress ? '.' . $oFile->{strCompressExtension} : ''), - false, # Source is not compressed since it is the db directory - $bCompress, # Destination should be compressed based on backup settings - true, # Ignore missing files - $oFileCopyMap{$strFile}{modification_time}, # Set modification time - undef, # Do not set original mode - true); # Create the destination directory if it does not exist - - if (!$bCopyResult) - { - # If file is missing assume the database removed it (else corruption and nothing we can do!) - &log(INFO, "thread ${iThreadIdx} skipped file removed by database: " . $oFileCopyMap{$strFile}{db_file}); - - # Remove file from the manifest - if ($bMulti) - { - # Write a message into the master queue to have the file removed from the manifest - $oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|". - "$oFileCopyMap{$strFile}{file}"); - } - else - { - # remove it directly - $oBackupManifest->remove($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}); } - # Move on to the next file - next; + # Log the checksum + &log (DEBUG, "write checksum ${strFileSection}:${strFile} into manifest: ${strChecksum} (${lFileSize})"); } } - - $strLogProgress = "$oFileCopyMap{$strFile}{db_file} (" . file_size_format($lCopySize) . - ($lSizeTotal > 0 ? ', ' . int($lSize * 100 / $lSizeTotal) . '%' : '') . ')'; - - # Generate checksum for file if configured - if ($lCopySize != 0) - { - # Store checksum in the manifest - if ($bMulti) - { - # Write the checksum message into the master queue - $oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|" . - "$oFileCopyMap{$strFile}{file}|${strCopyChecksum}|${lCopySize}"); - } - else - { - # Write it directly - $oBackupManifest->set($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}, - MANIFEST_SUBKEY_CHECKSUM, $strCopyChecksum); - $oBackupManifest->set($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}, - MANIFEST_SUBKEY_SIZE, $lCopySize + 0); - } - - # Output information about the file to be checksummed - if (!defined($strLog)) - { - $strLog = "thread ${iThreadIdx} checksum-only ${strLogProgress}"; - } - - &log(INFO, $strLog . " checksum ${strCopyChecksum}"); - } - else - { - &log(INFO, $strLog . ' ' . $strLogProgress); - } - - &log(TRACE, "thread waiting for new file from queue"); } - - &log(DEBUG, "thread ${iThreadIdx} exiting"); } #################################################################################################################################### @@ -1096,7 +793,7 @@ sub backup # If archive logs are required to complete the backup, then fetch them. This is the default, but can be overridden if the # archive logs are going to a different server. Be careful here because there is no way to verify that the backup will be - # consistent - at least not in this routine. + # consistent - at least not here. if (!optionGet(OPTION_NO_START_STOP) && optionGet(OPTION_BACKUP_ARCHIVE_CHECK)) { # Save the backup manifest a second time - before getting archive logs in case that fails diff --git a/lib/BackRest/BackupFile.pm b/lib/BackRest/BackupFile.pm new file mode 100644 index 000000000..79d0b15fc --- /dev/null +++ b/lib/BackRest/BackupFile.pm @@ -0,0 +1,133 @@ +#################################################################################################################################### +# BACKUP FILE MODULE +#################################################################################################################################### +package BackRest::BackupFile; + +use threads; +use strict; +use Thread::Queue; +use warnings FATAL => qw(all); +use Carp qw(confess); + +use File::Basename qw(dirname); +# use File::Path qw(remove_tree); +# use Scalar::Util qw(looks_like_number); +# use Fcntl 'SEEK_CUR'; +use Exporter qw(import); + +use lib dirname($0); +use BackRest::Utility; +use BackRest::Exception; +use BackRest::Manifest; +use BackRest::File; + +#################################################################################################################################### +# backupThread +#################################################################################################################################### +sub backupFile +{ + my $oFile = shift; # File object + my $strSourceFile = shift; # Source file to backup + my $strDestinationFile = shift; # Destination backup file + my $bDestinationCompress = shift; # Compress destination file + my $strChecksum = shift; # File checksum to be checked + my $bChecksumOnly = shift; # Checksum destination only + my $lSizeFile = shift; # Total size of the files to be copied + my $lSizeTotal = shift; # Total size of the files to be copied + my $lSizeCurrent = shift; # Size of files copied so far + + my $strLog; # Store the log message + my $strLogProgress; # Part of the log message that shows progress + my $bCopyResult; # Copy result + my $strCopyChecksum; # Copy checksum + my $lCopySize; # Copy Size + + # Add the size of the current file to keep track of percent complete + $lSizeCurrent += $lSizeFile; + + if ($bChecksumOnly) + { + $lCopySize = $lSizeFile; + $strCopyChecksum = 'dude'; + # !!! Need to put checksum code in here + } + else + { + # Output information about the file to be copied + $strLog = "backed up file"; + + # Copy the file from the database to the backup (will return false if the source file is missing) + ($bCopyResult, $strCopyChecksum, $lCopySize) = + $oFile->copy(PATH_DB_ABSOLUTE, $strSourceFile, + PATH_BACKUP_TMP, $strDestinationFile . + ($bDestinationCompress ? '.' . $oFile->{strCompressExtension} : ''), + false, # Source is not compressed since it is the db directory + $bDestinationCompress, # Destination should be compressed based on backup settings + true, # Ignore missing files + undef, # Do not set original modification time + undef, # Do not set original mode + true); # Create the destination directory if it does not exist + + if (!$bCopyResult) + { + # If file is missing assume the database removed it (else corruption and nothing we can do!) + &log(INFO, "skipped file removed by database: " . $strSourceFile); + + # # Remove file from the manifest + # if ($bMulti) + # { + # # Write a message into the master queue to have the file removed from the manifest + # $oMasterQueue[$iThreadIdx]->enqueue("remove|$oFileCopyMap{$strFile}{file_section}|". + # "$oFileCopyMap{$strFile}{file}"); + # } + # else + # { + # # remove it directly + # $oBackupManifest->remove($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}); + # } + + return false, $lSizeCurrent, undef, undef; + } + } + + $strLogProgress = "$strSourceFile (" . file_size_format($lCopySize) . + ($lSizeTotal > 0 ? ', ' . int($lSizeCurrent * 100 / $lSizeTotal) . '%' : '') . ')'; + + # Generate checksum for file if configured + if ($lCopySize != 0) + { + # # Store checksum in the manifest + # if ($bMulti) + # { + # # Write the checksum message into the master queue + # $oMasterQueue[$iThreadIdx]->enqueue("checksum|$oFileCopyMap{$strFile}{file_section}|" . + # "$oFileCopyMap{$strFile}{file}|${strCopyChecksum}|${lCopySize}"); + # } + # else + # { + # # Write it directly + # $oBackupManifest->set($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}, + # MANIFEST_SUBKEY_CHECKSUM, $strCopyChecksum); + # $oBackupManifest->set($oFileCopyMap{$strFile}{file_section}, $oFileCopyMap{$strFile}{file}, + # MANIFEST_SUBKEY_SIZE, $lCopySize + 0); + # } + + # Output information about the file to be checksummed + if (!defined($strLog)) + { + $strLog = "checksum-only ${strLogProgress}"; + } + + &log(INFO, $strLog . " checksum ${strCopyChecksum}"); + } + else + { + &log(INFO, $strLog . ' ' . $strLogProgress); + } + + return true, $lSizeCurrent, $lCopySize, $strCopyChecksum; +} + +our @EXPORT = qw(backupFile); + +1; diff --git a/lib/BackRest/Config.pm b/lib/BackRest/Config.pm index a6047e4f6..ba1d9df91 100644 --- a/lib/BackRest/Config.pm +++ b/lib/BackRest/Config.pm @@ -19,7 +19,7 @@ use BackRest::Utility; # Export functions #################################################################################################################################### our @EXPORT = qw(configLoad optionGet optionTest optionRuleGet optionRequired optionDefault operationGet operationTest - operationSet optionRemoteType optionRemoteTypeTest optionRemote optionRemoteTest); + operationSet optionRemoteType optionRemoteTypeTest optionRemote optionRemoteTest remoteDestroy); #################################################################################################################################### # DB/BACKUP Constants @@ -1601,6 +1601,7 @@ sub optionRemoteTypeTest sub optionRemote { my $bForceLocal = shift; + my $bStore = shift; # If force local or remote = NONE then create a local remote and return it if ((defined($bForceLocal) && $bForceLocal) || optionRemoteTypeTest(NONE)) @@ -1621,7 +1622,7 @@ sub optionRemote } # Return the remote when required - $oRemote = new BackRest::Remote + my $oRemoteTemp = new BackRest::Remote ( optionRemoteTypeTest(DB) ? optionGet(OPTION_DB_HOST) : optionGet(OPTION_BACKUP_HOST), optionRemoteTypeTest(DB) ? optionGet(OPTION_DB_USER) : optionGet(OPTION_BACKUP_USER), @@ -1633,7 +1634,25 @@ sub optionRemote operationTest(OP_EXPIRE) ? OPTION_DEFAULT_COMPRESS_LEVEL_NETWORK : optionGet(OPTION_COMPRESS_LEVEL_NETWORK) ); - return $oRemote; + if ($bStore) + { + $oRemote = $oRemoteTemp; + } + + return $oRemoteTemp; +} + +#################################################################################################################################### +# remoteDestroy +# +# Undefined the remote if it is stored locally. +#################################################################################################################################### +sub remoteDestroy +{ + if (defined($oRemote)) + { + undef($oRemote); + } } #################################################################################################################################### diff --git a/lib/BackRest/File.pm b/lib/BackRest/File.pm index 3ba8b26d0..e77fadbfd 100644 --- a/lib/BackRest/File.pm +++ b/lib/BackRest/File.pm @@ -392,6 +392,7 @@ sub link_create &log(DEBUG, "${strOperation}: ${strDebug}"); # If the destination path is backup and does not exist, create it + # !!! This should only happen when the link create errors if ($bPathCreate && $self->path_type_get($strDestinationPathType) eq PATH_BACKUP) { $self->path_create(PATH_BACKUP_ABSOLUTE, dirname($strDestination)); @@ -491,30 +492,31 @@ sub move { if (!rename($strPathOpSource, $strPathOpDestination)) { - my $strError = "${strPathOpDestination} could not be moved: " . $!; - my $iErrorCode = COMMAND_ERR_FILE_READ; - - if (!$self->exists(PATH_ABSOLUTE, dirname($strPathOpDestination))) + if ($bDestinationPathCreate) { - $strError = "${strPathOpDestination} does not exist"; - $iErrorCode = COMMAND_ERR_FILE_MISSING; + $self->path_create(PATH_ABSOLUTE, dirname($strPathOpDestination), undef, true); } - if (!($bDestinationPathCreate && $iErrorCode == COMMAND_ERR_FILE_MISSING)) + if (!$bDestinationPathCreate || !rename($strPathOpSource, $strPathOpDestination)) { - if ($strSourcePathType eq PATH_ABSOLUTE) + my $strError = "unable to move file ${strPathOpSource} to ${strPathOpDestination}: " . $!; + my $iErrorCode = COMMAND_ERR_FILE_READ; + + if (!$self->exists(PATH_ABSOLUTE, dirname($strPathOpDestination))) { - confess &log(ERROR, $strError, $iErrorCode); + $strError = "${strPathOpDestination} does not exist"; + $iErrorCode = COMMAND_ERR_FILE_MISSING; } - confess &log(ERROR, "${strDebug}: " . $strError); - } + if (!($bDestinationPathCreate && $iErrorCode == COMMAND_ERR_FILE_MISSING)) + { + if ($strSourcePathType eq PATH_ABSOLUTE) + { + confess &log(ERROR, $strError, $iErrorCode); + } - $self->path_create(PATH_ABSOLUTE, dirname($strPathOpDestination)); - - if (!rename($strPathOpSource, $strPathOpDestination)) - { - confess &log(ERROR, "unable to move file ${strPathOpSource}: " . $!); + confess &log(ERROR, "${strDebug}: " . $strError); + } } } } @@ -1394,30 +1396,31 @@ sub copy # Open the destination temp file if (!sysopen($hDestinationFile, $strDestinationTmpOp, O_WRONLY | O_CREAT)) { - my $strError = "${strDestinationTmpOp} could not be opened: " . $!; - my $iErrorCode = COMMAND_ERR_FILE_READ; - - if (!$self->exists(PATH_ABSOLUTE, dirname($strDestinationTmpOp))) + if ($bDestinationPathCreate) { - $strError = dirname($strDestinationTmpOp) . ' does not exist'; - $iErrorCode = COMMAND_ERR_FILE_MISSING; + $self->path_create(PATH_ABSOLUTE, dirname($strDestinationTmpOp), undef, true); } - if (!($bDestinationPathCreate && $iErrorCode == COMMAND_ERR_FILE_MISSING)) + if (!$bDestinationPathCreate || !sysopen($hDestinationFile, $strDestinationTmpOp, O_WRONLY | O_CREAT)) { - if ($strSourcePathType eq PATH_ABSOLUTE) + my $strError = "unable to open ${strDestinationTmpOp}: " . $!; + my $iErrorCode = COMMAND_ERR_FILE_READ; + + if (!$self->exists(PATH_ABSOLUTE, dirname($strDestinationTmpOp))) { - confess &log(ERROR, $strError, $iErrorCode); + $strError = dirname($strDestinationTmpOp) . ' does not exist'; + $iErrorCode = COMMAND_ERR_FILE_MISSING; } - confess &log(ERROR, "${strDebug}: " . $strError); - } + if (!($bDestinationPathCreate && $iErrorCode == COMMAND_ERR_FILE_MISSING)) + { + if ($strSourcePathType eq PATH_ABSOLUTE) + { + confess &log(ERROR, $strError, $iErrorCode); + } - $self->path_create(PATH_ABSOLUTE, dirname($strDestinationTmpOp)); - - if (!sysopen($hDestinationFile, $strDestinationTmpOp, O_WRONLY | O_CREAT | O_EXCL)) - { - confess &log(ERROR, "unable to open destination file ${strDestinationOp}: " . $!); + confess &log(ERROR, "${strDebug}: " . $strError); + } } } } @@ -1712,7 +1715,7 @@ sub copy } # Move the file from tmp to final destination - $self->move(PATH_ABSOLUTE, $strDestinationTmpOp, PATH_ABSOLUTE, $strDestinationOp, true); + $self->move(PATH_ABSOLUTE, $strDestinationTmpOp, PATH_ABSOLUTE, $strDestinationOp, $bDestinationPathCreate); } return $bResult, $strChecksum, $iFileSize; diff --git a/lib/BackRest/Remote.pm b/lib/BackRest/Remote.pm index 5e9e23af0..7689e6cbf 100644 --- a/lib/BackRest/Remote.pm +++ b/lib/BackRest/Remote.pm @@ -82,6 +82,7 @@ sub new master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]); $self->{oSSH}->error and confess &log(ERROR, "unable to connect to $self->{strHost}: " . $self->{oSSH}->error); + &log(TRACE, 'connected to remote ssh host ' . $self->{strHost}); # Execute remote command ($self->{hIn}, $self->{hOut}, $self->{hErr}, $self->{pId}) = $self->{oSSH}->open3($self->{strCommand}); @@ -120,6 +121,29 @@ sub new return $self; } + +#################################################################################################################################### +# DESTROY +#################################################################################################################################### +sub DESTROY +{ + my $self = shift; + + # Only send the exit command if the process is running + if (defined($self->{pId})) + { + &log(TRACE, "sending exit command to process"); + $self->command_write('exit'); + + # &log(TRACE, "waiting for remote process"); + # if (!$self->wait_pid(5, false)) + # { + # &log(TRACE, "killed remote process"); + # kill('KILL', $self->{pId}); + # } + } +} + #################################################################################################################################### # repoPath #################################################################################################################################### @@ -389,27 +413,70 @@ sub write_line sub wait_pid { my $self = shift; + my $fWaitTime = shift; + my $bReportError = shift; - if (defined($self->{pId}) && waitpid($self->{pId}, WNOHANG) != 0) + # Record the start time and set initial sleep interval + my $fStartTime = defined($fWaitTime) ? gettimeofday() : undef; + my $fSleep = defined($fWaitTime) ? .1 : undef; + + if (defined($self->{pId})) { - my $strError = 'no error on stderr'; - - if (!defined($self->{hErr})) + do { - $strError = 'no error captured because stderr is already closed'; - } - else - { - $strError = $self->pipe_to_string($self->{hErr}); - } + my $iResult = waitpid($self->{pId}, WNOHANG); - $self->{pId} = undef; - $self->{hIn} = undef; - $self->{hOut} = undef; - $self->{hErr} = undef; + if (defined($fWaitTime)) + { + confess &log(TRACE, "waitpid result = $iResult"); + } - confess &log(ERROR, "remote process terminated: ${strError}"); + # If there is no such process + if ($iResult == -1) + { + return true; + } + + if ($iResult > 0) + { + if (!defined($bReportError) || $bReportError) + { + my $strError = 'no error on stderr'; + + if (!defined($self->{hErr})) + { + $strError = 'no error captured because stderr is already closed'; + } + else + { + $strError = $self->pipe_to_string($self->{hErr}); + } + + $self->{pId} = undef; + $self->{hIn} = undef; + $self->{hOut} = undef; + $self->{hErr} = undef; + + confess &log(ERROR, "remote process terminated: ${strError}"); + } + + return true; + } + + &log(TRACE, "waiting for pid"); + + # If waiting then sleep before trying again + if (defined($fWaitTime)) + { + hsleep($fSleep); + $fSleep = $fSleep * 2 < $fWaitTime - (gettimeofday() - $fStartTime) ? + $fSleep * 2 : ($fWaitTime - (gettimeofday() - $fStartTime)) + .001; + } + } + while (defined($fWaitTime) && (gettimeofday() - $fStartTime) < $fWaitTime); } + + return false; } #################################################################################################################################### diff --git a/lib/BackRest/Restore.pm b/lib/BackRest/Restore.pm index 1fc3ae52a..fcd6c5ec5 100644 --- a/lib/BackRest/Restore.pm +++ b/lib/BackRest/Restore.pm @@ -17,6 +17,7 @@ use lib dirname($0); use BackRest::Exception; use BackRest::Utility; use BackRest::ThreadGroup; +use BackRest::RestoreFile; use BackRest::Config; use BackRest::Manifest; use BackRest::File; @@ -566,20 +567,46 @@ sub restore # Build paths/links in the restore paths $self->build($oManifest); - # Create thread queues + # Get variables required for restore + my $lCopyTimeBegin = $oManifest->epoch(MANIFEST_SECTION_BACKUP, MANIFEST_KEY_TIMESTAMP_COPY_START); + my $bSourceCompression = $oManifest->get(MANIFEST_SECTION_BACKUP_OPTION, MANIFEST_KEY_COMPRESS) eq 'y' ? true : false; + my $strCurrentUser = getpwuid($<); + my $strCurrentGroup = getgrgid($(); + + # Create thread queues (or do restore if single-threaded) my @oyRestoreQueue; + if ($self->{iThreadTotal} > 1) + { + &log(TRACE, "building thread queues"); + } + else + { + &log(TRACE, "starting restore in main process"); + } + foreach my $strPathKey ($oManifest->keys(MANIFEST_SECTION_BACKUP_PATH)) { my $strSection = "${strPathKey}:file"; if ($oManifest->test($strSection)) { - $oyRestoreQueue[@oyRestoreQueue] = Thread::Queue->new(); + if ($self->{iThreadTotal} > 1) + { + $oyRestoreQueue[@oyRestoreQueue] = Thread::Queue->new(); + } foreach my $strName ($oManifest->keys($strSection)) { - $oyRestoreQueue[@oyRestoreQueue - 1]->enqueue("${strPathKey}|${strName}"); + if ($self->{iThreadTotal} > 1) + { + $oyRestoreQueue[@oyRestoreQueue - 1]->enqueue("${strPathKey}|${strName}"); + } + else + { + restoreFile($strPathKey, $strName, $lCopyTimeBegin, $self->{bDelta}, $self->{bForce}, $self->{strBackupPath}, + $bSourceCompression, $strCurrentUser, $strCurrentGroup, $oManifest, $self->{oFile}); + } } } } @@ -587,179 +614,29 @@ sub restore # If multi-threaded then create threads to copy files if ($self->{iThreadTotal} > 1) { - # Create threads to process the thread queues - my $oThreadGroup = thread_group_create(); - for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++) { - &log(DEBUG, "starting restore thread ${iThreadIdx}"); - thread_group_add($oThreadGroup, threads->create(\&restore_thread, $self, true, - $iThreadIdx, \@oyRestoreQueue, $oManifest)); + my %oParam; + + $oParam{copy_time_begin} = $lCopyTimeBegin; + $oParam{delta} = $self->{bDelta}; + $oParam{force} = $self->{bForce}; + $oParam{backup_path} = $self->{strBackupPath}; + $oParam{source_compression} = $bSourceCompression; + $oParam{current_user} = $strCurrentUser; + $oParam{current_group} = $strCurrentGroup; + $oParam{queue} = \@oyRestoreQueue; + $oParam{manifest} = $oManifest; + + threadGroupRun($iThreadIdx, 'restore', \%oParam); } # Complete thread queues - thread_group_complete($oThreadGroup); - } - # Else copy in the main process - else - { - &log(DEBUG, "starting restore in main process"); - $self->restore_thread(false, 0, \@oyRestoreQueue, $oManifest); + threadGroupComplete(); } # Create recovery.conf file $self->recovery(); } -#################################################################################################################################### -# RESTORE_THREAD -# -# Worker threads for the restore process. -#################################################################################################################################### -sub restore_thread -{ - my $self = shift; # Class hash - my $bMulti = shift; # Is this thread one of many? - my $iThreadIdx = shift; # Defines the index of this thread - my $oyRestoreQueueRef = shift; # Restore queues - my $oManifest = shift; # Backup manifest - - my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread - my $oFileThread; # Thread local file object - - # If multi-threaded, then clone the file object - if ($bMulti) - { - $oFileThread = $self->{oFile}->clone($iThreadIdx); - } - # Else use the master file object - else - { - $oFileThread = $self->{oFile}; - } - - # Initialize the starting and current queue index based in the total number of threads in relation to this thread - my $iQueueStartIdx = int((@{$oyRestoreQueueRef} / $self->{iThreadTotal}) * $iThreadIdx); - my $iQueueIdx = $iQueueStartIdx; - - # Time when the backup copying began - used for size/timestamp deltas - my $lCopyTimeBegin = $oManifest->epoch(MANIFEST_SECTION_BACKUP, MANIFEST_KEY_TIMESTAMP_COPY_START); - - # Set source compression - my $bSourceCompression = $oManifest->get(MANIFEST_SECTION_BACKUP_OPTION, MANIFEST_KEY_COMPRESS) eq 'y' ? true : false; - - # When a KILL signal is received, immediately abort - $SIG{'KILL'} = sub {threads->exit();}; - - # Get the current user and group to compare with stored mode - my $strCurrentUser = getpwuid($<); - my $strCurrentGroup = getgrgid($(); - - # Loop through all the queues to restore files (exit when the original queue is reached - do - { - while (my $strMessage = ${$oyRestoreQueueRef}[$iQueueIdx]->dequeue_nb()) - { - my $strSourcePath = (split(/\|/, $strMessage))[0]; # Source path from backup - my $strSection = "${strSourcePath}:file"; # Backup section with file info - my $strDestinationPath = $oManifest->get(MANIFEST_SECTION_BACKUP_PATH, # Destination path stored in manifest - $strSourcePath); - $strSourcePath =~ s/\:/\//g; # Replace : with / in source path - my $strName = (split(/\|/, $strMessage))[1]; # Name of file to be restored - - # If the file is a reference to a previous backup and hardlinks are off, then fetch it from that backup - my $strReference = $oManifest->test(MANIFEST_SECTION_BACKUP_OPTION, MANIFEST_KEY_HARDLINK, undef, 'y') ? undef : - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_REFERENCE, false); - - # Generate destination file name - my $strDestinationFile = $oFileThread->path_get(PATH_DB_ABSOLUTE, "${strDestinationPath}/${strName}"); - - if ($oFileThread->exists(PATH_DB_ABSOLUTE, $strDestinationFile)) - { - # Perform delta if requested - if ($self->{bDelta}) - { - # If force then use size/timestamp delta - if ($self->{bForce}) - { - my $oStat = lstat($strDestinationFile); - - # Make sure that timestamp/size are equal and that timestamp is before the copy start time of the backup - if (defined($oStat) && - $oStat->size == $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_SIZE) && - $oStat->mtime == $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_MODIFICATION_TIME) && - $oStat->mtime < $lCopyTimeBegin) - { - &log(DEBUG, "${strDestinationFile} exists and matches size " . $oStat->size . - " and modification time " . $oStat->mtime); - next; - } - } - else - { - my ($strChecksum, $lSize) = $oFileThread->hash_size(PATH_DB_ABSOLUTE, $strDestinationFile); - - if (($lSize == $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_SIZE) && $lSize == 0) || - ($strChecksum eq $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_CHECKSUM))) - { - &log(DEBUG, "${strDestinationFile} exists and is zero size or matches backup checksum"); - - # Even if hash is the same set the time back to backup time. This helps with unit testing, but also - # presents a pristine version of the database. - utime($oManifest->get($strSection, $strName, MANIFEST_SUBKEY_MODIFICATION_TIME), - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_MODIFICATION_TIME), - $strDestinationFile) - or confess &log(ERROR, "unable to set time for ${strDestinationFile}"); - - next; - } - } - } - - $oFileThread->remove(PATH_DB_ABSOLUTE, $strDestinationFile); - } - - # Set user and group if running as root (otherwise current user and group will be used for restore) - # Copy the file from the backup to the database - my ($bCopyResult, $strCopyChecksum, $lCopySize) = - $oFileThread->copy(PATH_BACKUP_CLUSTER, (defined($strReference) ? $strReference : $self->{strBackupPath}) . - "/${strSourcePath}/${strName}" . - ($bSourceCompression ? '.' . $oFileThread->{strCompressExtension} : ''), - PATH_DB_ABSOLUTE, $strDestinationFile, - $bSourceCompression, # Source is compressed based on backup settings - undef, undef, - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_MODIFICATION_TIME), - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_MODE), - undef, - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_USER), - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_GROUP)); - - if ($lCopySize != 0 && $strCopyChecksum ne $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_CHECKSUM)) - { - confess &log(ERROR, "error restoring ${strDestinationFile}: actual checksum ${strCopyChecksum} " . - "does not match expected checksum " . - $oManifest->get($strSection, $strName, MANIFEST_SUBKEY_CHECKSUM), ERROR_CHECKSUM); - } - } - - # Even number threads move up when they have finished a queue, odd numbered threads move down - $iQueueIdx += $iDirection; - - # Reset the queue index when it goes over or under the number of queues - if ($iQueueIdx < 0) - { - $iQueueIdx = @{$oyRestoreQueueRef} - 1; - } - elsif ($iQueueIdx >= @{$oyRestoreQueueRef}) - { - $iQueueIdx = 0; - } - - &log(TRACE, "thread waiting for new file from queue: queue ${iQueueIdx}, start queue ${iQueueStartIdx}"); - } - while ($iQueueIdx != $iQueueStartIdx); - - &log(DEBUG, "thread ${iThreadIdx} exiting"); -} - 1; diff --git a/lib/BackRest/RestoreFile.pm b/lib/BackRest/RestoreFile.pm new file mode 100644 index 000000000..bbcbed892 --- /dev/null +++ b/lib/BackRest/RestoreFile.pm @@ -0,0 +1,125 @@ +#################################################################################################################################### +# RESTORE FILE MODULE +#################################################################################################################################### +package BackRest::RestoreFile; + +use threads; +use threads::shared; +use Thread::Queue; +use strict; +use warnings FATAL => qw(all); +use Carp qw(confess); + +use File::Basename qw(dirname); +use File::stat qw(lstat); +use Exporter qw(import); + +use lib dirname($0); +use BackRest::Exception; +use BackRest::Utility; +use BackRest::Config; +use BackRest::Manifest; +use BackRest::File; + +#################################################################################################################################### +# restoreFile +# +# Restores a single file. +#################################################################################################################################### +sub restoreFile +{ + my $strSourcePath = shift; # Source path of the file + my $strFileName = shift; # File to restore + my $lCopyTimeBegin = shift; # Time that the backup begain - used for size/timestamp deltas + my $bDelta = shift; # Is restore a delta? + my $bForce = shift; # Force flag + my $strBackupPath = shift; # Backup path + my $bSourceCompression = shift; # Is the source compressed? + my $strCurrentUser = shift; # Current OS user + my $strCurrentGroup = shift; # Current OS group + my $oManifest = shift; # Backup manifest + my $oFile = shift; # File object (only provided in single-threaded mode) + + my $strSection = "${strSourcePath}:file"; # Backup section with file info + my $strDestinationPath = $oManifest->get(MANIFEST_SECTION_BACKUP_PATH, # Destination path stored in manifest + $strSourcePath); + $strSourcePath =~ s/\:/\//g; # Replace : with / in source path + + # If the file is a reference to a previous backup and hardlinks are off, then fetch it from that backup + my $strReference = $oManifest->test(MANIFEST_SECTION_BACKUP_OPTION, MANIFEST_KEY_HARDLINK, undef, 'y') ? undef : + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_REFERENCE, false); + + # Generate destination file name + my $strDestinationFile = $oFile->path_get(PATH_DB_ABSOLUTE, "${strDestinationPath}/${strFileName}"); + + if ($oFile->exists(PATH_DB_ABSOLUTE, $strDestinationFile)) + { + # Perform delta if requested + if ($bDelta) + { + # If force then use size/timestamp delta + if ($bForce) + { + my $oStat = lstat($strDestinationFile); + + # Make sure that timestamp/size are equal and that timestamp is before the copy start time of the backup + if (defined($oStat) && + $oStat->size == $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_SIZE) && + $oStat->mtime == $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_MODIFICATION_TIME) && + $oStat->mtime < $lCopyTimeBegin) + { + &log(DEBUG, "${strDestinationFile} exists and matches size " . $oStat->size . + " and modification time " . $oStat->mtime); + return; + } + } + else + { + my ($strChecksum, $lSize) = $oFile->hash_size(PATH_DB_ABSOLUTE, $strDestinationFile); + + if (($lSize == $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_SIZE) && $lSize == 0) || + ($strChecksum eq $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_CHECKSUM))) + { + &log(DEBUG, "${strDestinationFile} exists and is zero size or matches backup checksum"); + + # Even if hash is the same set the time back to backup time. This helps with unit testing, but also + # presents a pristine version of the database. + utime($oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_MODIFICATION_TIME), + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_MODIFICATION_TIME), + $strDestinationFile) + or confess &log(ERROR, "unable to set time for ${strDestinationFile}"); + + return; + } + } + } + + $oFile->remove(PATH_DB_ABSOLUTE, $strDestinationFile); + } + + # Set user and group if running as root (otherwise current user and group will be used for restore) + # Copy the file from the backup to the database + my ($bCopyResult, $strCopyChecksum, $lCopySize) = + $oFile->copy(PATH_BACKUP_CLUSTER, (defined($strReference) ? $strReference : $strBackupPath) . + "/${strSourcePath}/${strFileName}" . + ($bSourceCompression ? '.' . $oFile->{strCompressExtension} : ''), + PATH_DB_ABSOLUTE, $strDestinationFile, + $bSourceCompression, # Source is compressed based on backup settings + undef, undef, + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_MODIFICATION_TIME), + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_MODE), + undef, + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_USER), + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_GROUP)); + + if ($lCopySize != 0 && $strCopyChecksum ne $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_CHECKSUM)) + { + confess &log(ERROR, "error restoring ${strDestinationFile}: actual checksum ${strCopyChecksum} " . + "does not match expected checksum " . + $oManifest->get($strSection, $strFileName, MANIFEST_SUBKEY_CHECKSUM), ERROR_CHECKSUM); + } +} + +our @EXPORT = qw(restoreFile); + +1; diff --git a/lib/BackRest/ThreadGroup.pm b/lib/BackRest/ThreadGroup.pm index 4e8cd85d1..609e4c43e 100644 --- a/lib/BackRest/ThreadGroup.pm +++ b/lib/BackRest/ThreadGroup.pm @@ -12,50 +12,257 @@ use File::Basename; use lib dirname($0) . '/../lib'; use BackRest::Utility; +use BackRest::Config; +use BackRest::RestoreFile; +use BackRest::BackupFile; #################################################################################################################################### # MODULE EXPORTS #################################################################################################################################### use Exporter qw(import); -our @EXPORT = qw(thread_group_create thread_group_add thread_group_complete); +our @EXPORT = qw(threadGroupCreate threadGroupRun threadGroupComplete threadGroupDestroy); + +my @oyThread; +my @oyMessageQueue; +my @oyCommandQueue; +my @oyResultQueue; +my @byThreadRunning; #################################################################################################################################### -# CONSTRUCTOR +# threadGroupCreate #################################################################################################################################### -sub thread_group_create +sub threadGroupCreate { - # Create the class hash - my $self = {}; + # If thread-max is not defined then this operation does not use threads + if (!optionTest(OPTION_THREAD_MAX)) + { + return; + } - # Initialize variables - $self->{iThreadTotal} = 0; + # Get thread-max + my $iThreadMax = optionGet(OPTION_THREAD_MAX); - return $self; + # Only create threads when thread-max > 1 + if ($iThreadMax > 1) + { + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++) + { + push @oyCommandQueue, Thread::Queue->new(); + push @oyMessageQueue, Thread::Queue->new(); + push @oyResultQueue, Thread::Queue->new(); + push @oyThread, (threads->create(\&threadGroupThread, $iThreadIdx)); + push @byThreadRunning, false; + } + } } #################################################################################################################################### -# ADD -# -# Add a thread to the group. Once a thread is added, it can be tracked as part of the group. +# threadGroupThread #################################################################################################################################### -sub thread_group_add +sub threadGroupThread { - my $self = shift; - my $oThread = shift; + my $iThreadIdx = shift; - $self->{oyThread}[$self->{iThreadTotal}] = $oThread; - $self->{iThreadTotal}++; + # When a KILL signal is received, immediately abort + $SIG{'KILL'} = sub {threads->exit();}; - return $self->{iThreadTotal} - 1; + while (my $oCommand = $oyCommandQueue[$iThreadIdx]->dequeue()) + { + # Exit thread + if ($$oCommand{function} eq 'exit') + { + &log(TRACE, 'thread terminated'); + return; + } + + &log(TRACE, "$$oCommand{function} thread started"); + + # Create a file object + my $oFile = new BackRest::File + ( + optionGet(OPTION_STANZA), + optionRemoteTypeTest(BACKUP) ? optionGet(OPTION_REPO_REMOTE_PATH) : optionGet(OPTION_REPO_PATH), + optionRemoteType(), + optionRemote(undef, false), + undef, undef, + $iThreadIdx + 1 + ); + + # Notify parent that init is complete + threadMessage($oyResultQueue[$iThreadIdx], 'init'); + + my $iDirection = $iThreadIdx % 2 == 0 ? 1 : -1; # Size of files currently copied by this thread + + # Initialize the starting and current queue index based in the total number of threads in relation to this thread + my $iQueueStartIdx = int((@{$$oCommand{param}{queue}} / $$oCommand{thread_total}) * $iThreadIdx); + my $iQueueIdx = $iQueueStartIdx; + + # Keep track of progress + my $lSizeCurrent = 0; # Running total of bytes copied + + # Loop through all the queues (exit when the original queue is reached) + do + { + &log(TRACE, "reading queue ${iQueueIdx}, start queue ${iQueueStartIdx}"); + + while (my $oMessage = ${$$oCommand{param}{queue}}[$iQueueIdx]->dequeue_nb()) + { + if ($$oCommand{function} eq 'restore') + { + my $strSourcePath = (split(/\|/, $oMessage))[0]; + my $strFileName = (split(/\|/, $oMessage))[1]; + + restoreFile($strSourcePath, $strFileName, $$oCommand{param}{copy_time_begin}, $$oCommand{param}{delta}, + $$oCommand{param}{force}, $$oCommand{param}{backup_path}, $$oCommand{param}{source_compression}, + $$oCommand{param}{current_user}, $$oCommand{param}{current_group}, $$oCommand{param}{manifest}, + $oFile); + } + elsif ($$oCommand{function} eq 'backup') + { + my $bCopied; # Was the file copied? + my $lCopySize; # Size reported by copy + my $strCopyChecksum; # Checksum reported by copy + + # Backup the file + ($bCopied, $lSizeCurrent, $lCopySize, $strCopyChecksum) = + backupFile($oFile, $$oMessage{db_file}, $$oMessage{backup_file}, $$oCommand{param}{compress}, + $$oMessage{checksum}, $$oMessage{checksum_only}, + $$oMessage{size}, $$oCommand{param}{size_total}, $lSizeCurrent); + + # If copy was successful store the checksum and size + if ($bCopied) + { + $$oCommand{param}{result_queue}->enqueue("checksum|$$oMessage{file_section}|" . + "$$oMessage{file}|${strCopyChecksum}|${lCopySize}"); + } + # Else the file was removed during backup so remove from manifest + else + { + $$oCommand{param}{result_queue}->enqueue("remove|$$oMessage{file_section}|". + "$$oMessage{file}"); + } + } + else + { + confess &log(ERROR, "unknown command"); + } + } + + # Even numbered threads move up when they have finished a queue, odd numbered threads move down + $iQueueIdx += $iDirection; + + # Reset the queue index when it goes over or under the number of queues + if ($iQueueIdx < 0) + { + $iQueueIdx = @{$$oCommand{param}{queue}} - 1; + } + elsif ($iQueueIdx >= @{$$oCommand{param}{queue}}) + { + $iQueueIdx = 0; + } + } + while ($iQueueIdx != $iQueueStartIdx); + + # Notify parent of shutdown + threadMessage($oyResultQueue[$iThreadIdx], 'shutdown'); + threadMessageExpect($oyMessageQueue[$iThreadIdx], 'continue'); + + # Destroy the file object + undef($oFile); + + # Notify the parent process of thread exit + $oyResultQueue[$iThreadIdx]->enqueue('complete'); + + &log(TRACE, "$$oCommand{function} thread exiting"); + } } #################################################################################################################################### -# COMPLETE +# threadMessage +#################################################################################################################################### +sub threadMessage +{ + my $oQueue = shift; + my $strMessage = shift; + my $iThreadIdx = shift; + + # Send the message + $oQueue->enqueue($strMessage); + + # Define calling context + &log(TRACE, "sent message '${strMessage}' to " . (defined($iThreadIdx) ? 'thread ' . ($iThreadIdx + 1) : 'controller')); +} + +#################################################################################################################################### +# threadMessageExpect +#################################################################################################################################### +sub threadMessageExpect +{ + my $oQueue = shift; + my $strExpected = shift; + my $iThreadIdx = shift; + my $bNoBlock = shift; + + # Set timeout based on the message type + my $iTimeout = defined($bNoBlock) ? undef: 600; + + # Define calling context + my $strContext = defined($iThreadIdx) ? 'thread ' . ($iThreadIdx + 1) : 'controller'; + + # Wait for the message + my $strMessage; + + if (defined($iTimeout)) + { + &log(TRACE, "waiting for '${strExpected}' message from ${strContext}"); + $strMessage = $oQueue->dequeue_timed($iTimeout); + } + else + { + $strMessage = $oQueue->dequeue_nb(); + + return false if !defined($strMessage); + } + + # Throw an exeception when the message was not received + if (!defined($strMessage) || $strMessage ne $strExpected) + { + confess &log(ASSERT, "expected message '$strExpected' from ${strContext} but " . + (defined($strMessage) ? "got '$strMessage'" : "timed out after ${iTimeout} second(s)")); + } + + &log(TRACE, "got '${strExpected}' message from ${strContext}"); + + return true; +} + +#################################################################################################################################### +# threadGroupRun +#################################################################################################################################### +sub threadGroupRun +{ + my $iThreadIdx = shift; + my $strFunction = shift; + my $oParam = shift; + + my %oCommand; + $oCommand{function} = $strFunction; + $oCommand{thread_total} = @oyThread; + $oCommand{param} = $oParam; + + $oyCommandQueue[$iThreadIdx]->enqueue(\%oCommand); + + threadMessageExpect($oyResultQueue[$iThreadIdx], 'init', $iThreadIdx); + $byThreadRunning[$iThreadIdx] = true; +} + +#################################################################################################################################### +# threadGroupComplete # # Wait for threads to complete. #################################################################################################################################### -sub thread_group_complete +sub threadGroupComplete { my $self = shift; my $iTimeout = shift; @@ -67,9 +274,13 @@ sub thread_group_complete # Wait for all threads to complete and handle errors my $iThreadComplete = 0; my $lTimeBegin = time(); + my $strFirstError; + my $iFirstErrorThreadIdx; + + &log(DEBUG, "waiting for " . @oyThread . " threads to complete"); # Rejoin the threads - while ($iThreadComplete < $self->{iThreadTotal}) + while ($iThreadComplete < @oyThread) { hsleep(.1); @@ -79,37 +290,43 @@ sub thread_group_complete if (time() - $lTimeBegin >= $iTimeout) { confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting..."); - - #backup_thread_kill(); - - #confess &log(WARN, "all threads have exited, aborting..."); } } - for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++) { - if (defined($self->{oyThread}[$iThreadIdx])) + if ($byThreadRunning[$iThreadIdx]) { - if (defined($self->{oyThread}[$iThreadIdx]->error())) - { - $self->kill(); + my $oError = $oyThread[$iThreadIdx]->error(); - if ($bConfessOnError) + if (defined($oError)) + { + my $strError; + + if ($oError->isa('BackRest::Exception')) { - confess &log(ERROR, 'error in thread ' . (${iThreadIdx} + 1) . ': check log for details'); + $strError = $oError->message(); } else { - return false; + $strError = $oError; + &log(ERROR, "thread " . ($iThreadIdx) . ": ${strError}"); } - } - if ($self->{oyThread}[$iThreadIdx]->is_joinable()) + if (!defined($strFirstError)) + { + $strFirstError = $strError; + $iFirstErrorThreadIdx = $iThreadIdx; + } + + $byThreadRunning[$iThreadIdx] = false; + $iThreadComplete++; + } + elsif (threadMessageExpect($oyResultQueue[$iThreadIdx], 'shutdown', $iThreadIdx, true)) { - &log(DEBUG, "thread ${iThreadIdx} exited"); - $self->{oyThread}[$iThreadIdx]->join(); - &log(TRACE, "thread ${iThreadIdx} object undef"); - undef($self->{oyThread}[$iThreadIdx]); + threadMessage($oyMessageQueue[$iThreadIdx], 'continue', $iThreadIdx); + threadMessageExpect($oyResultQueue[$iThreadIdx], 'complete', $iThreadIdx); + $byThreadRunning[$iThreadIdx] = false; $iThreadComplete++; } } @@ -118,48 +335,46 @@ sub thread_group_complete &log(DEBUG, 'all threads exited'); - return true; + if (defined($strFirstError) && $bConfessOnError) + { + confess &log(ERROR, 'error in thread' . ($iFirstErrorThreadIdx + 1) . ": $strFirstError"); + } } #################################################################################################################################### -# KILL +# threadGroupDestroy #################################################################################################################################### -sub thread_group_destroy +sub threadGroupDestroy { my $self = shift; - # Total number of threads killed - my $iTotal = 0; + &log(TRACE, "waiting for " . @oyThread . " threads to be destroyed"); - for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < @oyThread; $iThreadIdx++) { - if (defined($self->{oyThread}[$iThreadIdx])) - { - if ($self->{oyThread}[$iThreadIdx]->is_running()) - { - $self->{oyThread}[$iThreadIdx]->kill('KILL')->join(); - } - elsif ($self->{oyThread}[$iThreadIdx]->is_joinable()) - { - $self->{oyThread}[$iThreadIdx]->join(); - } + my %oCommand; + $oCommand{function} = 'exit'; - undef($self->{oyThread}[$iThreadIdx]); - $iTotal++; + $oyCommandQueue[$iThreadIdx]->enqueue(\%oCommand); + hsleep(.1); + + if ($oyThread[$iThreadIdx]->is_running()) + { + $oyThread[$iThreadIdx]->kill('KILL')->join(); + &log(TRACE, "thread ${iThreadIdx} killed"); } + elsif ($oyThread[$iThreadIdx]->is_joinable()) + { + $oyThread[$iThreadIdx]->join(); + &log(TRACE, "thread ${iThreadIdx} joined"); + } + + undef($oyThread[$iThreadIdx]); } - return($iTotal); + &log(TRACE, @oyThread . " threads destroyed"); + + return(@oyThread); } -#################################################################################################################################### -# DESTRUCTOR -#################################################################################################################################### -# sub thread_group_destroy -# { -# my $self = shift; -# -# $self->kill(); -# } - 1; diff --git a/lib/BackRest/Utility.pm b/lib/BackRest/Utility.pm index 2a70ccaa2..c5566cce9 100644 --- a/lib/BackRest/Utility.pm +++ b/lib/BackRest/Utility.pm @@ -530,7 +530,8 @@ sub log # Format the message text my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time); - $strMessageFormat = timestamp_string_get() . sprintf(' T%02d', threads->tid()) . + $strMessageFormat = timestamp_string_get() . sprintf('.%03d T%02d', (gettimeofday() - int(gettimeofday())) * 1000, + threads->tid()) . (' ' x (7 - length($strLevel))) . "${strLevel}: ${strMessageFormat}\n"; # Output to console depending on log level and test flag