From 0979841f1a08352ece1ade8821d69080f82b1132 Mon Sep 17 00:00:00 2001 From: David Steele Date: Sat, 15 Feb 2014 14:18:15 -0500 Subject: [PATCH] Async compress, thread kill improvements --- pg_backrest.pl | 53 +++++----- pg_backrest_backup.pm | 224 +++++++++++++++++++++++++++++++++--------- pg_backrest_file.pm | 39 +++++++- 3 files changed, 238 insertions(+), 78 deletions(-) diff --git a/pg_backrest.pl b/pg_backrest.pl index 4839f59c3..e94c5a592 100755 --- a/pg_backrest.pl +++ b/pg_backrest.pl @@ -129,30 +129,7 @@ sub config_load #################################################################################################################################### sub safe_exit { - my $iTotal = 0; - -# for (my $iThreadIndex = 0; $iThreadIndex < scalar @pg_backrest_backup::oThread; $iThreadIndex++) -# { -# &log(INFO, "dequeueing thread ${iThreadIndex}"); -# -# $pg_backrest_backup::oThreadQueue[$iThreadIndex]->dequeue_nb(10000000000000); -# $pg_backrest_backup::oThreadQueue[$iThreadIndex]->enqueue(undef); -# } - - for (my $iThreadIndex = 0; $iThreadIndex < scalar @pg_backrest_backup::oThread; $iThreadIndex++) - { - &log(INFO, "joining thread ${iThreadIndex}"); - - $pg_backrest_backup::oThread[$iThreadIndex]->kill('KILL')->join(); - undef($pg_backrest_backup::oThread[$iThreadIndex]); - $iTotal++; - } - -# for (my $iIndex = 0; $iIndex < scalar @pg_backrest_db::oGlobalSSH; $iIndex++) -# { -# undef $pg_backrest_db::oGlobalSSH[$iIndex]; -# $iTotal++; -# } + my $iTotal = backup_thread_kill(); confess &log(ERROR, "process was terminated on signal, ${iTotal} threads stopped"); } @@ -304,6 +281,34 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) my $bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false; my $bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false; + # Do async compression + if ($bCompressAsync) + { + # Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file + my $oFile = pg_backrest_file->new + ( + strStanza => $strStanza, + bNoCompression => false, + strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true), + strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum), + strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress), + strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress), + strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST) + ); + + backup_init + ( + undef, + $oFile, + undef, + undef, + !$bChecksum, + config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX) + ); + + archive_compress($strArchivePath . "/archive/${strStanza}"); + } + # Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file my $oFile = pg_backrest_file->new ( diff --git a/pg_backrest_backup.pm b/pg_backrest_backup.pm index 82eae09e2..77be53317 100644 --- a/pg_backrest_backup.pm +++ b/pg_backrest_backup.pm @@ -25,7 +25,7 @@ use pg_backrest_db; use Exporter qw(import); -our @EXPORT = qw(backup_init archive_push archive_pull backup backup_expire archive_list_get); +our @EXPORT = qw(backup_init backup_thread_kill archive_push archive_pull archive_compress backup backup_expire archive_list_get); my $oDb; my @oFile; @@ -33,13 +33,14 @@ my $strType = "incremental"; # Type of backup: full, differential (diff), my $bHardLink; my $bNoChecksum; my $iThreadMax; +my $iThreadLocalMax; my $iThreadThreshold = 10; my $iSmallFileThreshold = 65536; my $bArchiveRequired; # Thread variables -our @oThread; -our @oThreadQueue; +my @oThread; +my @oThreadQueue; my %oFileCopyMap; #################################################################################################################################### @@ -55,6 +56,11 @@ sub backup_init my $iThreadMaxParam = shift; my $bArchiveRequiredParam = shift; + for (my $iFileIdx = 0; $iFileIdx < scalar @oFile; $iFileIdx++) + { + undef($oFile[$iFileIdx]); + } + $oDb = $oDbParam; $oFile[0] = $oFileParam; $strType = $strTypeParam; @@ -80,9 +86,9 @@ sub backup_init sub thread_init { my $iThreadRequestTotal = shift; # Number of threads that were requested - + my $iThreadActualTotal; # Number of actual threads assigned - + if (!defined($iThreadRequestTotal)) { $iThreadActualTotal = $iThreadMax; @@ -96,7 +102,7 @@ sub thread_init $iThreadActualTotal = 1; } } - + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++) { if (!defined($oThreadQueue[$iThreadIdx])) @@ -113,6 +119,85 @@ sub thread_init return $iThreadActualTotal; } +#################################################################################################################################### +# BACKUP_THREAD_KILL +#################################################################################################################################### +sub backup_thread_kill +{ + my $iTotal = 0; + + for (my $iThreadIdx = 0; $iThreadIdx < scalar @oThread; $iThreadIdx++) + { + if (defined($oThread[$iThreadIdx])) + { + if ($oThread[$iThreadIdx]->is_running()) + { + $oThread[$iThreadIdx]->kill('KILL')->join(); + } + elsif ($oThread[$iThreadIdx]->is_joinable()) + { + $oThread[$iThreadIdx]->join(); + } + + undef($oThread[$iThreadIdx]); + $iTotal++; + } + } + + return($iTotal); +} + +#################################################################################################################################### +# BACKUP_THREAD_COMPLETE +#################################################################################################################################### +sub backup_thread_complete +{ + my $bConfessOnError = shift; + + if (!defined($bConfessOnError)) + { + $bConfessOnError = true; + } + + # Wait for all threads to complete and handle errors + my $iThreadComplete = 0; + + # Rejoin the threads + while ($iThreadComplete < $iThreadLocalMax) + { + sleep(1); + + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + { + if (defined($oThread[$iThreadIdx])) + { + if (defined($oThread[$iThreadIdx]->error())) + { + backup_thread_kill(); + + if ($bConfessOnError) + { + confess &log(ERROR, "error in thread ${iThreadIdx}: check log for details"); + } + else + { + return false; + } + } + + if ($oThread[$iThreadIdx]->is_joinable()) + { + $oThread[$iThreadIdx]->join(); + undef($oThread[$iThreadIdx]); + $iThreadComplete++; + } + } + } + } + + return true; +} + #################################################################################################################################### # ARCHIVE_PUSH #################################################################################################################################### @@ -143,7 +228,7 @@ sub archive_push sub archive_pull { my $strArchivePath = shift; - my $strCompressAsync = shift; + my $bCompressAsync = shift; # Load the archive manifest - all the files that need to be pushed my %oManifestHash = $oFile[0]->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath); @@ -175,12 +260,12 @@ sub archive_pull &log(INFO, "archive to be copied to backup total ${lFileTotal}, size " . file_size_format($lFileSize)); # Init the thread variables - my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1); - my @oThread; + $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1); my $iThreadIdx = 0; &log(DEBUG, "actual threads ${iThreadLocalMax}/${iThreadMax}"); + # Distribute files among the threads foreach my $strFile (sort @stryFile) { $oThreadQueue[$iThreadIdx]->enqueue($strFile); @@ -195,25 +280,9 @@ sub archive_pull $oThread[$iThreadIdx] = threads->create(\&archive_pull_copy_thread, $iThreadIdx, $strArchivePath); } - # Rejoin the threads - for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) - { - $oThread[$iThreadIdx]->join(); - } - - # If there are errors then compress - # If compress async then go and compress all uncompressed archive files -# if ($bCompressAsync) -# { -# # Find all the archive files -# foreach my $strFile (@stryFile) -# { -# &log(DEBUG, "SHOULD BE LOGGING ${strFile}"); -# } -# } - - + backup_thread_complete(); + # Return 1 indicating that processing should continue return $lFileTotal; } @@ -241,6 +310,78 @@ sub archive_pull_copy_thread } } +sub archive_compress +{ + my $strArchivePath = shift; + + # Load the archive manifest - all the files that need to be pushed + my %oManifestHash = $oFile[0]->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath); + + # Get all the files to be compressed and calculate the total size + my @stryFile; + my $lFileSize = 0; + my $lFileTotal = 0; + + foreach my $strFile (sort(keys $oManifestHash{name})) + { + if ($strFile =~ /^[0-F]{16}\/[0-F]{24}(\-[0-f]+){0,1}$/) + { + push @stryFile, $strFile; + + $lFileSize += $oManifestHash{name}{"$strFile"}{size}; + $lFileTotal++; + } + } + + if ($lFileTotal == 0) + { + &log(ERROR, "no archive logs to be compressed"); + + return; + } + + # Output files to be compressed + &log(INFO, "archive to be compressed total ${lFileTotal}, size " . file_size_format($lFileSize)); + + # Init the thread variables + $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1); + my $iThreadIdx = 0; + + # Distribute files among the threads + foreach my $strFile (sort @stryFile) + { + $oThreadQueue[$iThreadIdx]->enqueue($strFile); + + $iThreadIdx = ($iThreadIdx + 1 == $iThreadLocalMax) ? 0 : $iThreadIdx + 1; + } + + # End each thread queue and start the thread + for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + { + $oThreadQueue[$iThreadIdx]->enqueue(undef); + $oThread[$iThreadIdx] = threads->create(\&archive_pull_compress_thread, $iThreadIdx, $strArchivePath); + } + + # Don't die on an error because we'd still like to try transferring + backup_thread_complete(false); +} + +sub archive_pull_compress_thread +{ + my @args = @_; + + my $iThreadIdx = $args[0]; + my $strArchivePath = $args[1]; + + while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue()) + { + &log(INFO, "thread ${iThreadIdx} compressing archive file ${strFile}"); + + # Compress the file + $oFile[$iThreadIdx]->file_compress(PATH_DB_ABSOLUTE, "${strArchivePath}/${strFile}"); + } +} + #################################################################################################################################### # BACKUP_REGEXP_GET - Generate a regexp depending on the backups that need to be found #################################################################################################################################### @@ -801,7 +942,7 @@ sub backup_file } # Build the thread queues - my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1); + $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1); &log(DEBUG, "actual threads ${iThreadLocalMax}/${iThreadMax}"); # Initialize the thread size array @@ -877,24 +1018,7 @@ sub backup_file $oyThreadData[$iThreadIdx]{size}); } - # Wait for all threads to complete and handle errors - my $iThreadComplete = 0; - - # Rejoin the threads - while ($iThreadComplete < $iThreadLocalMax) - { - sleep(1); - - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) - { - if ($oThread[$iThreadIdx]->is_joinable()) - { - $oThread[$iThreadIdx]->join(); - undef($oThread[$iThreadIdx]); - $iThreadComplete++; - } - } - } + backup_thread_complete(); # # Look for errors # for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) @@ -914,9 +1038,11 @@ sub backup_file_thread my $bNoChecksum = $args[1]; my $bPathCreate = $args[2]; my $lSizeTotal = $args[3]; - + my $lSize = 0; - + + $SIG{'KILL'} = sub {threads->exit();}; + while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue()) { &log(INFO, "thread ${iThreadIdx} backing up file $oFileCopyMap{$strFile}{db_file} (" . @@ -1295,7 +1421,7 @@ sub backup_expire if (!defined($strArchiveLast)) { - &log(INFO, "invalid archive location retrieved ${$strArchiveRetentionBackup}"); + confess &log(INFO, "invalid archive location retrieved ${strArchiveRetentionBackup}"); } &log(INFO, "archive retention starts at " . $strArchiveLast); diff --git a/pg_backrest_file.pm b/pg_backrest_file.pm index 655abd735..3a4c108ef 100644 --- a/pg_backrest_file.pm +++ b/pg_backrest_file.pm @@ -528,11 +528,11 @@ sub file_copy # Generate the command string depending on compression/decompression/cat my $strCommand = $self->{strCommandCat}; - if ($bAlreadyCompressed && $bCompress) - { - $strDestination .= $strDestination =~ "^.*\.$self->{strCompressExtension}\$" ? ".gz" : ""; - } - elsif (!$bAlreadyCompressed && $bCompress) +# if ($bAlreadyCompressed && $bCompress) +# { +# $strDestination .= $strDestination =~ "^.*\.$self->{strCompressExtension}\$" ? ".gz" : ""; +# } + if (!$bAlreadyCompressed && $bCompress) { $strCommand = $self->{strCommandCompress}; $strDestination .= ".gz"; @@ -680,6 +680,35 @@ sub file_hash_get return trim(capture($strCommand)) or confess &log(ERROR, "unable to checksum ${strPath}"); } +#################################################################################################################################### +# FILE_COMPRESS +#################################################################################################################################### +sub file_compress +{ + my $self = shift; + my $strPathType = shift; + my $strFile = shift; + + # For now this operation is not supported remotely. Not currently needed. + if ($self->is_remote($strPathType)) + { + confess &log(ASSERT, "remote operation not supported"); + } + + if (!defined($self->{strCommandCompress})) + { + confess &log(ASSERT, "\$strCommandChecksum not defined"); + } + + my $strPath = $self->path_get($strPathType, $strFile); + + # Build the command + my $strCommand = $self->{strCommandCompress}; + $strCommand =~ s/\%file\%/${strPath}/g; + $strCommand =~ s/\ \-\-stdout//g; + + system($strCommand) == 0 or confess &log(ERROR, "unable to compress ${strPath}: ${strCommand}"); +} #################################################################################################################################### # FILE_LIST_GET