diff --git a/pg_backrest.conf b/pg_backrest.conf index 9d97ce407..687774707 100644 --- a/pg_backrest.conf +++ b/pg_backrest.conf @@ -12,10 +12,11 @@ user=backrest host=localhost path=/Users/backrest/test archive_required=n +thread-max=2 [global:archive] path=/Users/dsteele/test -compress=n +compress-async=y [global:retention] full_retention=2 diff --git a/pg_backrest.pl b/pg_backrest.pl index 975d02ea1..990a78108 100755 --- a/pg_backrest.pl +++ b/pg_backrest.pl @@ -37,7 +37,10 @@ use constant CONFIG_KEY_HOST => "host", CONFIG_KEY_PATH => "path", + CONFIG_KEY_THREAD_MAX => "thread-max", + CONFIG_KEY_COMPRESS => "compress", + CONFIG_KEY_COMPRESS_ASYNC => "compress-async", CONFIG_KEY_DECOMPRESS => "decompress", CONFIG_KEY_CHECKSUM => "checksum", CONFIG_KEY_MANIFEST => "manifest", @@ -129,6 +132,7 @@ if (!defined($strOperation)) } if ($strOperation ne OP_ARCHIVE_PUSH && + $strOperation ne OP_ARCHIVE_PULL && $strOperation ne OP_BACKUP && $strOperation ne OP_EXPIRE) { @@ -168,93 +172,137 @@ if (!defined($strStanza)) #################################################################################################################################### # ARCHIVE-PUSH Command #################################################################################################################################### -if ($strOperation eq OP_ARCHIVE_PUSH) +if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) { # If an archive section has been defined, use that instead of the backup section when operation is OP_ARCHIVE_PUSH my $strSection = defined(config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH)) ? CONFIG_SECTION_ARCHIVE : CONFIG_SECTION_BACKUP; + + # Get the async compress flag. If compress_async=y then compression is off for the initial push + my $bCompressAsync = config_load($strSection, CONFIG_KEY_COMPRESS_ASYNC, true, "n") eq "n" ? false : true; + + if ($strOperation eq OP_ARCHIVE_PUSH) + { + # Make sure that archive-push is running locally + if (defined(config_load(CONFIG_SECTION_STANZA, CONFIG_KEY_HOST))) + { + confess &log(ERROR, "stanza host cannot be set on archive-push - must be run locally on db server"); + } - # Get the operational flags - my $bCompress = config_load($strSection, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false; - my $bChecksum = config_load($strSection, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false; + # Make sure that compress and compress_async are not both set + # if (defined(config_load($strSection, CONFIG_KEY_COMPRESS)) && defined(config_load($strSection, CONFIG_KEY_COMPRESS_ASYNC))) + # { + # confess &log(ERROR, "compress and compress_async cannot both be set"); + # } - # Run file_init_archive - this is the minimal config needed to run archiving - my $oFile = pg_backrest_file->new - ( - strStanza => $strStanza, - bNoCompression => !$bCompress, - strBackupUser => config_load($strSection, CONFIG_KEY_USER), - strBackupHost => config_load($strSection, CONFIG_KEY_HOST), - strBackupPath => config_load($strSection, CONFIG_KEY_PATH, true), - strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum), - strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress), - strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress) - ); + # Get the compress flag + my $bCompress = $bCompressAsync ? false : config_load($strSection, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false; - backup_init - ( - undef, - $oFile, - undef, - undef, - !$bChecksum - ); + # Get the checksum flag + my $bChecksum = config_load($strSection, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false; - # Call the archive_push function - if (!defined($ARGV[1])) - { - confess &log(ERROR, "source archive file not provided - show usage"); + # Run file_init_archive - this is the minimal config needed to run archiving + my $oFile = pg_backrest_file->new + ( + strStanza => $strStanza, + bNoCompression => !$bCompress, + strBackupUser => config_load($strSection, CONFIG_KEY_USER), + strBackupHost => config_load($strSection, CONFIG_KEY_HOST), + strBackupPath => config_load($strSection, CONFIG_KEY_PATH, true), + strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum), + strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress), + strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress) + ); + + backup_init + ( + undef, + $oFile, + undef, + undef, + !$bChecksum + ); + + # Call the archive_push function + if (!defined($ARGV[1])) + { + confess &log(ERROR, "source archive file not provided - show usage"); + } + + archive_push($ARGV[1]); + + # Only continue if we are archiving local and a backup server is defined + if (!($strSection eq CONFIG_SECTION_ARCHIVE && defined(config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST)))) + { + exit 0; + } + + # Set the operation so that archive-pull will be called next + $strOperation = OP_ARCHIVE_PULL; + + # fork and exit the parent process + if (fork()) + { + exit 0; + } } - archive_push($ARGV[1]); - - # Is backup section defined? If so fork, exit parent, and push archive logs to the backup server asynchronously afterwards - if ($strSection eq CONFIG_SECTION_ARCHIVE && defined(config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST)) && fork()) + if ($strOperation eq OP_ARCHIVE_PULL) { - exit 0; - } + # Make sure that archive-pull is running on the db server + if (defined(config_load(CONFIG_SECTION_STANZA, CONFIG_KEY_HOST))) + { + confess &log(ERROR, "stanza host cannot be set on archive-pull - must be run locally on db server"); + } + + # Create a lock file to make sure archive-pull does not run more than once + my $strArchivePath = config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH); + my $strLockFile = "${strArchivePath}/lock/archive-${strStanza}.lock"; + my $fLockFile; - # Create a lock file to make sure archive-pull does not run more than once - my $strArchivePath = config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH); - my $fLockFile; + sysopen($fLockFile, $strLockFile, O_WRONLY | O_CREAT) + or confess &log(ERROR, "unable to open lock file ${strLockFile}"); + + if (!flock($fLockFile, LOCK_EX | LOCK_NB)) + { + &log(INFO, "archive-pull process is already running - exiting"); + exit 0 + } + + # Get the new operational flags + my $bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false; + my $bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false; + + # Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file + my $oFile = pg_backrest_file->new + ( + strStanza => $strStanza, + bNoCompression => !$bCompress, + strBackupUser => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_USER), + strBackupHost => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST), + strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true), + strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum), + strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress), + strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress), + strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST) + ); + + backup_init + ( + undef, + $oFile, + undef, + undef, + !$bChecksum, + config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX) + ); + + # Call the archive_pull function Continue to loop as long as there are files to process. + while (archive_pull($strArchivePath . "/archive/${strStanza}", $bCompressAsync)) + { + sleep(5); + } + } - sysopen($fLockFile, "${strArchivePath}/${strStanza}/archive.lock", O_WRONLY | O_CREAT) or die; - - if (!flock($fLockFile, LOCK_EX | LOCK_NB)) - { - &log(INFO, "archive-pull process is already running - exiting"); - exit 0 - } - - # Get the new operational flags - $bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false; - $bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false; - - # Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file - $oFile = pg_backrest_file->new - ( - strStanza => $strStanza, - bNoCompression => !$bCompress, - strBackupUser => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_USER), - strBackupHost => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST), - strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true), - strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum), - strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress), - strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress), - strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST) - ); - - backup_init - ( - undef, - $oFile, - undef, - undef, - !$bChecksum - ); - - # Call the archive_pull function - archive_pull($strArchivePath); - exit 0; } @@ -315,7 +363,7 @@ backup_init $strType, config_load(CONFIG_SECTION_BACKUP, "hardlink", true, "n") eq "y" ? true : false, !$bChecksum, - config_load(CONFIG_SECTION_BACKUP, "thread"), + config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX), config_load(CONFIG_SECTION_BACKUP, "archive_required", true, "y") eq "y" ? true : false ); diff --git a/pg_backrest_backup.pm b/pg_backrest_backup.pm index 64899d42d..5b76371cd 100644 --- a/pg_backrest_backup.pm +++ b/pg_backrest_backup.pm @@ -32,7 +32,8 @@ my $oFile; my $strType = "incremental"; # Type of backup: full, differential (diff), incremental (incr) my $bHardLink; my $bNoChecksum; -my $iThreadTotal; +my $iThreadMax; +my $iThreadThreshold = 10; my $bArchiveRequired; # Thread variables @@ -50,7 +51,7 @@ sub backup_init my $strTypeParam = shift; my $bHardLinkParam = shift; my $bNoChecksumParam = shift; - my $iThreadTotalParam = shift; + my $iThreadMaxParam = shift; my $bArchiveRequiredParam = shift; $oDb = $oDbParam; @@ -58,13 +59,57 @@ sub backup_init $strType = $strTypeParam; $bHardLink = $bHardLinkParam; $bNoChecksum = $bNoChecksumParam; - $iThreadTotal = $iThreadTotalParam; + $iThreadMax = $iThreadMaxParam; $bArchiveRequired = $bArchiveRequiredParam; - if (!defined($iThreadTotal)) + if (!defined($iThreadMax)) { - $iThreadTotal = 1; + $iThreadMax = 1; } + + if ($iThreadMax < 1 || $iThreadMax > 32) + { + confess &log(ERROR, "thread_max must be between 1 and 32"); + } +} + +#################################################################################################################################### +# THREAD_INIT +#################################################################################################################################### +sub thread_init +{ + my $iThreadRequestTotal = shift; # Number of threads that were requested + + my $iThreadActualTotal; # Number of actual threads assigned + + if (!defined($iThreadRequestTotal)) + { + $iThreadActualTotal = $iThreadMax; + } + else + { + $iThreadActualTotal = $iThreadRequestTotal < $iThreadMax ? $iThreadRequestTotal : $iThreadMax; + + if ($iThreadActualTotal < 1) + { + $iThreadActualTotal = 1; + } + } + + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++) + { + if (!defined($oThreadQueue[$iThreadIdx])) + { + $oThreadQueue[$iThreadIdx] = Thread::Queue->new(); + } + + if (!defined($oThreadFile[$iThreadIdx])) + { + $oThreadFile[$iThreadIdx] = $oFile->clone($iThreadIdx); + } + } + + return $iThreadActualTotal; } #################################################################################################################################### @@ -97,14 +142,15 @@ sub archive_push sub archive_pull { my $strArchivePath = shift; - my $strCompressLocal = shift; + my $strCompressAsync = shift; # Load the archive manifest - all the files that need to be pushed - my %oManifestHash = $oFile->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath . "/archive/" . ${oFile}->{strStanza}); + my %oManifestHash = $oFile->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath); # Get all the files to be transferred and calculate the total size my @stryFile; my $lFileSize = 0; + my $lFileTotal = 0; foreach my $strFile (sort(keys $oManifestHash{name})) { @@ -113,15 +159,80 @@ sub archive_pull push @stryFile, $strFile; $lFileSize += $oManifestHash{name}{"$strFile"}{size}; + $lFileTotal++; } } - &log(INFO, "total archive to be copied to backup " . (${lFileSize} / 1024 / 1024 ) . "MB"); - - # Find all the archive files - foreach my $strFile (@stryFile) + if ($lFileTotal == 0) { - &log(DEBUG, "SHOULD BE LOGGING ${strFile}"); + &log(INFO, "no archive logs to be copied to backup"); + + return 0; + } + + # Output files to be moved to backup + &log(INFO, "archive to be copied to backup total ${lFileTotal}, size " . (${lFileSize} / 1024 / 1024 ) . "MB"); + + # Init the thread variables + my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold)); + my @oThread; + my $iThreadIdx = 0; + + &log(INFO, "actual threads ${iThreadLocalMax}/${iThreadMax}"); + + # If compress async then go and compress all uncompressed archive files +# if ($bCompressAsync) +# { +# # Find all the archive files +# foreach my $strFile (@stryFile) +# { +# &log(DEBUG, "SHOULD BE LOGGING ${strFile}"); +# } +# } + + foreach my $strFile (sort @stryFile) + { + $oThreadQueue[$iThreadIdx]->enqueue($strFile); + + $iThreadIdx = ($iThreadIdx + 1 == $iThreadLocalMax) ? 0 : $iThreadIdx + 1; + } + + # End each thread queue and start the thread + for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + { + $oThreadQueue[$iThreadIdx]->enqueue(undef); + $oThread[$iThreadIdx] = threads->create(\&archive_pull_copy_thread, $iThreadIdx, $strArchivePath); + } + + # Rejoin the threads + for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++) + { + $oThread[$iThreadIdx]->join(); + } + + return $lFileTotal; +} + +sub archive_pull_copy_thread +{ + my @args = @_; + + my $iThreadIdx = $args[0]; + my $strArchivePath = $args[1]; + + while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue()) + { + &log(DEBUG, " thread ${iThreadIdx} backing up archive file ${strFile}"); + my $strArchiveFile = "${strArchivePath}/${strFile}"; + + # Copy the file + $oThreadFile[$iThreadIdx]->file_copy(PATH_DB_ABSOLUTE, $strArchiveFile, + PATH_BACKUP_ARCHIVE, basename($strFile), + undef, undef, + undef); # cannot set permissions remotely yet $oFile->{strDefaultFilePermission}); + + # Remove the source archive file + unlink($strArchiveFile) or confess &log(ERROR, "unable to remove ${strArchiveFile}"); } } @@ -528,7 +639,7 @@ sub backup_file # Build the thread queues my @oThread; - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++) { $oThreadFile[$iThreadIdx] = $oFile->clone($iThreadIdx); $oThreadQueue[$iThreadIdx] = Thread::Queue->new(); @@ -536,12 +647,12 @@ sub backup_file # Assign files to each thread queue my $iThreadFileSmallIdx = 0; - my $iThreadFileSmallTotalMax = $lFileSmallTotal / $iThreadTotal; + my $iThreadFileSmallTotalMax = $lFileSmallTotal / $iThreadMax; my $fThreadFileSmallSize = 0; my $iThreadFileSmallTotal = 0; my $iThreadFileLargeIdx = 0; - my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadTotal; + my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadMax; my $fThreadFileLargeSize = 0; my $iThreadFileLargeTotal = 0; @@ -560,7 +671,7 @@ sub backup_file $fThreadFileLargeSize += $lFileSize; $iThreadFileLargeTotal++; - if ($fThreadFileLargeSize >= $fThreadFileLargeSizeMax && $iThreadFileLargeIdx < $iThreadTotal - 1) + if ($fThreadFileLargeSize >= $fThreadFileLargeSizeMax && $iThreadFileLargeIdx < $iThreadMax - 1) { &log(DEBUG, " thread ${iThreadFileLargeIdx} large total ${iThreadFileLargeTotal}, size ${fThreadFileLargeSize}"); @@ -576,7 +687,7 @@ sub backup_file $fThreadFileSmallSize += $lFileSize; $iThreadFileSmallTotal++; - if ($iThreadFileSmallTotal >= $iThreadFileSmallTotalMax && $iThreadFileSmallIdx < $iThreadTotal - 1) + if ($iThreadFileSmallTotal >= $iThreadFileSmallTotalMax && $iThreadFileSmallIdx < $iThreadMax - 1) { &log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}"); @@ -591,14 +702,14 @@ sub backup_file &log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}"); # End each thread queue and start the thread - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++) { $oThreadQueue[$iThreadIdx]->enqueue(undef); $oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, $bNoChecksum); } # Rejoin the threads - for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++) + for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++) { $oThread[$iThreadIdx]->join(); } @@ -774,6 +885,9 @@ sub backup } } + # Need some sort of backup validate - create a manifest and compare the backup manifest + # !!! DO IT + # Save the backup conf file final time backup_manifest_save($strBackupConfFile, \%oBackupManifest);