1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-05 15:05:48 +02:00

Async archive log backup working.

This commit is contained in:
David Steele 2014-02-13 12:02:45 -05:00
parent f0df759bb9
commit 14b13d4840
3 changed files with 259 additions and 96 deletions

View File

@ -12,10 +12,11 @@ user=backrest
host=localhost
path=/Users/backrest/test
archive_required=n
thread-max=2
[global:archive]
path=/Users/dsteele/test
compress=n
compress-async=y
[global:retention]
full_retention=2

View File

@ -37,7 +37,10 @@ use constant
CONFIG_KEY_HOST => "host",
CONFIG_KEY_PATH => "path",
CONFIG_KEY_THREAD_MAX => "thread-max",
CONFIG_KEY_COMPRESS => "compress",
CONFIG_KEY_COMPRESS_ASYNC => "compress-async",
CONFIG_KEY_DECOMPRESS => "decompress",
CONFIG_KEY_CHECKSUM => "checksum",
CONFIG_KEY_MANIFEST => "manifest",
@ -129,6 +132,7 @@ if (!defined($strOperation))
}
if ($strOperation ne OP_ARCHIVE_PUSH &&
$strOperation ne OP_ARCHIVE_PULL &&
$strOperation ne OP_BACKUP &&
$strOperation ne OP_EXPIRE)
{
@ -168,93 +172,137 @@ if (!defined($strStanza))
####################################################################################################################################
# ARCHIVE-PUSH Command
####################################################################################################################################
if ($strOperation eq OP_ARCHIVE_PUSH)
if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL)
{
# If an archive section has been defined, use that instead of the backup section when operation is OP_ARCHIVE_PUSH
my $strSection = defined(config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH)) ? CONFIG_SECTION_ARCHIVE : CONFIG_SECTION_BACKUP;
# Get the async compress flag. If compress_async=y then compression is off for the initial push
my $bCompressAsync = config_load($strSection, CONFIG_KEY_COMPRESS_ASYNC, true, "n") eq "n" ? false : true;
if ($strOperation eq OP_ARCHIVE_PUSH)
{
# Make sure that archive-push is running locally
if (defined(config_load(CONFIG_SECTION_STANZA, CONFIG_KEY_HOST)))
{
confess &log(ERROR, "stanza host cannot be set on archive-push - must be run locally on db server");
}
# Get the operational flags
my $bCompress = config_load($strSection, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false;
my $bChecksum = config_load($strSection, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false;
# Make sure that compress and compress_async are not both set
# if (defined(config_load($strSection, CONFIG_KEY_COMPRESS)) && defined(config_load($strSection, CONFIG_KEY_COMPRESS_ASYNC)))
# {
# confess &log(ERROR, "compress and compress_async cannot both be set");
# }
# Run file_init_archive - this is the minimal config needed to run archiving
my $oFile = pg_backrest_file->new
(
strStanza => $strStanza,
bNoCompression => !$bCompress,
strBackupUser => config_load($strSection, CONFIG_KEY_USER),
strBackupHost => config_load($strSection, CONFIG_KEY_HOST),
strBackupPath => config_load($strSection, CONFIG_KEY_PATH, true),
strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum),
strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress),
strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress)
);
# Get the compress flag
my $bCompress = $bCompressAsync ? false : config_load($strSection, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false;
backup_init
(
undef,
$oFile,
undef,
undef,
!$bChecksum
);
# Get the checksum flag
my $bChecksum = config_load($strSection, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false;
# Call the archive_push function
if (!defined($ARGV[1]))
{
confess &log(ERROR, "source archive file not provided - show usage");
# Run file_init_archive - this is the minimal config needed to run archiving
my $oFile = pg_backrest_file->new
(
strStanza => $strStanza,
bNoCompression => !$bCompress,
strBackupUser => config_load($strSection, CONFIG_KEY_USER),
strBackupHost => config_load($strSection, CONFIG_KEY_HOST),
strBackupPath => config_load($strSection, CONFIG_KEY_PATH, true),
strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum),
strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress),
strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress)
);
backup_init
(
undef,
$oFile,
undef,
undef,
!$bChecksum
);
# Call the archive_push function
if (!defined($ARGV[1]))
{
confess &log(ERROR, "source archive file not provided - show usage");
}
archive_push($ARGV[1]);
# Only continue if we are archiving local and a backup server is defined
if (!($strSection eq CONFIG_SECTION_ARCHIVE && defined(config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST))))
{
exit 0;
}
# Set the operation so that archive-pull will be called next
$strOperation = OP_ARCHIVE_PULL;
# fork and exit the parent process
if (fork())
{
exit 0;
}
}
archive_push($ARGV[1]);
# Is backup section defined? If so fork, exit parent, and push archive logs to the backup server asynchronously afterwards
if ($strSection eq CONFIG_SECTION_ARCHIVE && defined(config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST)) && fork())
if ($strOperation eq OP_ARCHIVE_PULL)
{
exit 0;
}
# Make sure that archive-pull is running on the db server
if (defined(config_load(CONFIG_SECTION_STANZA, CONFIG_KEY_HOST)))
{
confess &log(ERROR, "stanza host cannot be set on archive-pull - must be run locally on db server");
}
# Create a lock file to make sure archive-pull does not run more than once
my $strArchivePath = config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH);
my $strLockFile = "${strArchivePath}/lock/archive-${strStanza}.lock";
my $fLockFile;
# Create a lock file to make sure archive-pull does not run more than once
my $strArchivePath = config_load(CONFIG_SECTION_ARCHIVE, CONFIG_KEY_PATH);
my $fLockFile;
sysopen($fLockFile, $strLockFile, O_WRONLY | O_CREAT)
or confess &log(ERROR, "unable to open lock file ${strLockFile}");
if (!flock($fLockFile, LOCK_EX | LOCK_NB))
{
&log(INFO, "archive-pull process is already running - exiting");
exit 0
}
# Get the new operational flags
my $bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false;
my $bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false;
# Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file
my $oFile = pg_backrest_file->new
(
strStanza => $strStanza,
bNoCompression => !$bCompress,
strBackupUser => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_USER),
strBackupHost => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST),
strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true),
strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum),
strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress),
strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress),
strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST)
);
backup_init
(
undef,
$oFile,
undef,
undef,
!$bChecksum,
config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX)
);
# Call the archive_pull function Continue to loop as long as there are files to process.
while (archive_pull($strArchivePath . "/archive/${strStanza}", $bCompressAsync))
{
sleep(5);
}
}
sysopen($fLockFile, "${strArchivePath}/${strStanza}/archive.lock", O_WRONLY | O_CREAT) or die;
if (!flock($fLockFile, LOCK_EX | LOCK_NB))
{
&log(INFO, "archive-pull process is already running - exiting");
exit 0
}
# Get the new operational flags
$bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false;
$bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false;
# Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file
$oFile = pg_backrest_file->new
(
strStanza => $strStanza,
bNoCompression => !$bCompress,
strBackupUser => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_USER),
strBackupHost => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST),
strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true),
strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum),
strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress),
strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress),
strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST)
);
backup_init
(
undef,
$oFile,
undef,
undef,
!$bChecksum
);
# Call the archive_pull function
archive_pull($strArchivePath);
exit 0;
}
@ -315,7 +363,7 @@ backup_init
$strType,
config_load(CONFIG_SECTION_BACKUP, "hardlink", true, "n") eq "y" ? true : false,
!$bChecksum,
config_load(CONFIG_SECTION_BACKUP, "thread"),
config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX),
config_load(CONFIG_SECTION_BACKUP, "archive_required", true, "y") eq "y" ? true : false
);

View File

@ -32,7 +32,8 @@ my $oFile;
my $strType = "incremental"; # Type of backup: full, differential (diff), incremental (incr)
my $bHardLink;
my $bNoChecksum;
my $iThreadTotal;
my $iThreadMax;
my $iThreadThreshold = 10;
my $bArchiveRequired;
# Thread variables
@ -50,7 +51,7 @@ sub backup_init
my $strTypeParam = shift;
my $bHardLinkParam = shift;
my $bNoChecksumParam = shift;
my $iThreadTotalParam = shift;
my $iThreadMaxParam = shift;
my $bArchiveRequiredParam = shift;
$oDb = $oDbParam;
@ -58,13 +59,57 @@ sub backup_init
$strType = $strTypeParam;
$bHardLink = $bHardLinkParam;
$bNoChecksum = $bNoChecksumParam;
$iThreadTotal = $iThreadTotalParam;
$iThreadMax = $iThreadMaxParam;
$bArchiveRequired = $bArchiveRequiredParam;
if (!defined($iThreadTotal))
if (!defined($iThreadMax))
{
$iThreadTotal = 1;
$iThreadMax = 1;
}
if ($iThreadMax < 1 || $iThreadMax > 32)
{
confess &log(ERROR, "thread_max must be between 1 and 32");
}
}
####################################################################################################################################
# THREAD_INIT
####################################################################################################################################
sub thread_init
{
my $iThreadRequestTotal = shift; # Number of threads that were requested
my $iThreadActualTotal; # Number of actual threads assigned
if (!defined($iThreadRequestTotal))
{
$iThreadActualTotal = $iThreadMax;
}
else
{
$iThreadActualTotal = $iThreadRequestTotal < $iThreadMax ? $iThreadRequestTotal : $iThreadMax;
if ($iThreadActualTotal < 1)
{
$iThreadActualTotal = 1;
}
}
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++)
{
if (!defined($oThreadQueue[$iThreadIdx]))
{
$oThreadQueue[$iThreadIdx] = Thread::Queue->new();
}
if (!defined($oThreadFile[$iThreadIdx]))
{
$oThreadFile[$iThreadIdx] = $oFile->clone($iThreadIdx);
}
}
return $iThreadActualTotal;
}
####################################################################################################################################
@ -97,14 +142,15 @@ sub archive_push
sub archive_pull
{
my $strArchivePath = shift;
my $strCompressLocal = shift;
my $strCompressAsync = shift;
# Load the archive manifest - all the files that need to be pushed
my %oManifestHash = $oFile->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath . "/archive/" . ${oFile}->{strStanza});
my %oManifestHash = $oFile->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath);
# Get all the files to be transferred and calculate the total size
my @stryFile;
my $lFileSize = 0;
my $lFileTotal = 0;
foreach my $strFile (sort(keys $oManifestHash{name}))
{
@ -113,15 +159,80 @@ sub archive_pull
push @stryFile, $strFile;
$lFileSize += $oManifestHash{name}{"$strFile"}{size};
$lFileTotal++;
}
}
&log(INFO, "total archive to be copied to backup " . (${lFileSize} / 1024 / 1024 ) . "MB");
# Find all the archive files
foreach my $strFile (@stryFile)
if ($lFileTotal == 0)
{
&log(DEBUG, "SHOULD BE LOGGING ${strFile}");
&log(INFO, "no archive logs to be copied to backup");
return 0;
}
# Output files to be moved to backup
&log(INFO, "archive to be copied to backup total ${lFileTotal}, size " . (${lFileSize} / 1024 / 1024 ) . "MB");
# Init the thread variables
my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold));
my @oThread;
my $iThreadIdx = 0;
&log(INFO, "actual threads ${iThreadLocalMax}/${iThreadMax}");
# If compress async then go and compress all uncompressed archive files
# if ($bCompressAsync)
# {
# # Find all the archive files
# foreach my $strFile (@stryFile)
# {
# &log(DEBUG, "SHOULD BE LOGGING ${strFile}");
# }
# }
foreach my $strFile (sort @stryFile)
{
$oThreadQueue[$iThreadIdx]->enqueue($strFile);
$iThreadIdx = ($iThreadIdx + 1 == $iThreadLocalMax) ? 0 : $iThreadIdx + 1;
}
# End each thread queue and start the thread
for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
$oThreadQueue[$iThreadIdx]->enqueue(undef);
$oThread[$iThreadIdx] = threads->create(\&archive_pull_copy_thread, $iThreadIdx, $strArchivePath);
}
# Rejoin the threads
for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
$oThread[$iThreadIdx]->join();
}
return $lFileTotal;
}
sub archive_pull_copy_thread
{
my @args = @_;
my $iThreadIdx = $args[0];
my $strArchivePath = $args[1];
while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue())
{
&log(DEBUG, " thread ${iThreadIdx} backing up archive file ${strFile}");
my $strArchiveFile = "${strArchivePath}/${strFile}";
# Copy the file
$oThreadFile[$iThreadIdx]->file_copy(PATH_DB_ABSOLUTE, $strArchiveFile,
PATH_BACKUP_ARCHIVE, basename($strFile),
undef, undef,
undef); # cannot set permissions remotely yet $oFile->{strDefaultFilePermission});
# Remove the source archive file
unlink($strArchiveFile) or confess &log(ERROR, "unable to remove ${strArchiveFile}");
}
}
@ -528,7 +639,7 @@ sub backup_file
# Build the thread queues
my @oThread;
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++)
{
$oThreadFile[$iThreadIdx] = $oFile->clone($iThreadIdx);
$oThreadQueue[$iThreadIdx] = Thread::Queue->new();
@ -536,12 +647,12 @@ sub backup_file
# Assign files to each thread queue
my $iThreadFileSmallIdx = 0;
my $iThreadFileSmallTotalMax = $lFileSmallTotal / $iThreadTotal;
my $iThreadFileSmallTotalMax = $lFileSmallTotal / $iThreadMax;
my $fThreadFileSmallSize = 0;
my $iThreadFileSmallTotal = 0;
my $iThreadFileLargeIdx = 0;
my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadTotal;
my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadMax;
my $fThreadFileLargeSize = 0;
my $iThreadFileLargeTotal = 0;
@ -560,7 +671,7 @@ sub backup_file
$fThreadFileLargeSize += $lFileSize;
$iThreadFileLargeTotal++;
if ($fThreadFileLargeSize >= $fThreadFileLargeSizeMax && $iThreadFileLargeIdx < $iThreadTotal - 1)
if ($fThreadFileLargeSize >= $fThreadFileLargeSizeMax && $iThreadFileLargeIdx < $iThreadMax - 1)
{
&log(DEBUG, " thread ${iThreadFileLargeIdx} large total ${iThreadFileLargeTotal}, size ${fThreadFileLargeSize}");
@ -576,7 +687,7 @@ sub backup_file
$fThreadFileSmallSize += $lFileSize;
$iThreadFileSmallTotal++;
if ($iThreadFileSmallTotal >= $iThreadFileSmallTotalMax && $iThreadFileSmallIdx < $iThreadTotal - 1)
if ($iThreadFileSmallTotal >= $iThreadFileSmallTotalMax && $iThreadFileSmallIdx < $iThreadMax - 1)
{
&log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}");
@ -591,14 +702,14 @@ sub backup_file
&log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}");
# End each thread queue and start the thread
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++)
{
$oThreadQueue[$iThreadIdx]->enqueue(undef);
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, $bNoChecksum);
}
# Rejoin the threads
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadMax; $iThreadIdx++)
{
$oThread[$iThreadIdx]->join();
}
@ -774,6 +885,9 @@ sub backup
}
}
# Need some sort of backup validate - create a manifest and compare the backup manifest
# !!! DO IT
# Save the backup conf file final time
backup_manifest_save($strBackupConfFile, \%oBackupManifest);