1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Async compress, thread kill improvements

This commit is contained in:
David Steele 2014-02-15 14:18:15 -05:00
parent bd00fb7f0c
commit 0979841f1a
3 changed files with 238 additions and 78 deletions

View File

@ -129,30 +129,7 @@ sub config_load
####################################################################################################################################
sub safe_exit
{
my $iTotal = 0;
# for (my $iThreadIndex = 0; $iThreadIndex < scalar @pg_backrest_backup::oThread; $iThreadIndex++)
# {
# &log(INFO, "dequeueing thread ${iThreadIndex}");
#
# $pg_backrest_backup::oThreadQueue[$iThreadIndex]->dequeue_nb(10000000000000);
# $pg_backrest_backup::oThreadQueue[$iThreadIndex]->enqueue(undef);
# }
for (my $iThreadIndex = 0; $iThreadIndex < scalar @pg_backrest_backup::oThread; $iThreadIndex++)
{
&log(INFO, "joining thread ${iThreadIndex}");
$pg_backrest_backup::oThread[$iThreadIndex]->kill('KILL')->join();
undef($pg_backrest_backup::oThread[$iThreadIndex]);
$iTotal++;
}
# for (my $iIndex = 0; $iIndex < scalar @pg_backrest_db::oGlobalSSH; $iIndex++)
# {
# undef $pg_backrest_db::oGlobalSSH[$iIndex];
# $iTotal++;
# }
my $iTotal = backup_thread_kill();
confess &log(ERROR, "process was terminated on signal, ${iTotal} threads stopped");
}
@ -304,6 +281,34 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL)
my $bCompress = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_COMPRESS, true, "y") eq "y" ? true : false;
my $bChecksum = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_CHECKSUM, true, "y") eq "y" ? true : false;
# Do async compression
if ($bCompressAsync)
{
# Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file
my $oFile = pg_backrest_file->new
(
strStanza => $strStanza,
bNoCompression => false,
strBackupPath => config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true),
strCommandChecksum => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_CHECKSUM, $bChecksum),
strCommandCompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_COMPRESS, $bCompress),
strCommandDecompress => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_DECOMPRESS, $bCompress),
strCommandManifest => config_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_MANIFEST)
);
backup_init
(
undef,
$oFile,
undef,
undef,
!$bChecksum,
config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_THREAD_MAX)
);
archive_compress($strArchivePath . "/archive/${strStanza}");
}
# Run file_init_archive - this is the minimal config needed to run archive pulling !!! need to close the old file
my $oFile = pg_backrest_file->new
(

View File

@ -25,7 +25,7 @@ use pg_backrest_db;
use Exporter qw(import);
our @EXPORT = qw(backup_init archive_push archive_pull backup backup_expire archive_list_get);
our @EXPORT = qw(backup_init backup_thread_kill archive_push archive_pull archive_compress backup backup_expire archive_list_get);
my $oDb;
my @oFile;
@ -33,13 +33,14 @@ my $strType = "incremental"; # Type of backup: full, differential (diff),
my $bHardLink;
my $bNoChecksum;
my $iThreadMax;
my $iThreadLocalMax;
my $iThreadThreshold = 10;
my $iSmallFileThreshold = 65536;
my $bArchiveRequired;
# Thread variables
our @oThread;
our @oThreadQueue;
my @oThread;
my @oThreadQueue;
my %oFileCopyMap;
####################################################################################################################################
@ -55,6 +56,11 @@ sub backup_init
my $iThreadMaxParam = shift;
my $bArchiveRequiredParam = shift;
for (my $iFileIdx = 0; $iFileIdx < scalar @oFile; $iFileIdx++)
{
undef($oFile[$iFileIdx]);
}
$oDb = $oDbParam;
$oFile[0] = $oFileParam;
$strType = $strTypeParam;
@ -80,9 +86,9 @@ sub backup_init
sub thread_init
{
my $iThreadRequestTotal = shift; # Number of threads that were requested
my $iThreadActualTotal; # Number of actual threads assigned
if (!defined($iThreadRequestTotal))
{
$iThreadActualTotal = $iThreadMax;
@ -96,7 +102,7 @@ sub thread_init
$iThreadActualTotal = 1;
}
}
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadActualTotal; $iThreadIdx++)
{
if (!defined($oThreadQueue[$iThreadIdx]))
@ -113,6 +119,85 @@ sub thread_init
return $iThreadActualTotal;
}
####################################################################################################################################
# BACKUP_THREAD_KILL
####################################################################################################################################
sub backup_thread_kill
{
my $iTotal = 0;
for (my $iThreadIdx = 0; $iThreadIdx < scalar @oThread; $iThreadIdx++)
{
if (defined($oThread[$iThreadIdx]))
{
if ($oThread[$iThreadIdx]->is_running())
{
$oThread[$iThreadIdx]->kill('KILL')->join();
}
elsif ($oThread[$iThreadIdx]->is_joinable())
{
$oThread[$iThreadIdx]->join();
}
undef($oThread[$iThreadIdx]);
$iTotal++;
}
}
return($iTotal);
}
####################################################################################################################################
# BACKUP_THREAD_COMPLETE
####################################################################################################################################
sub backup_thread_complete
{
my $bConfessOnError = shift;
if (!defined($bConfessOnError))
{
$bConfessOnError = true;
}
# Wait for all threads to complete and handle errors
my $iThreadComplete = 0;
# Rejoin the threads
while ($iThreadComplete < $iThreadLocalMax)
{
sleep(1);
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
if (defined($oThread[$iThreadIdx]))
{
if (defined($oThread[$iThreadIdx]->error()))
{
backup_thread_kill();
if ($bConfessOnError)
{
confess &log(ERROR, "error in thread ${iThreadIdx}: check log for details");
}
else
{
return false;
}
}
if ($oThread[$iThreadIdx]->is_joinable())
{
$oThread[$iThreadIdx]->join();
undef($oThread[$iThreadIdx]);
$iThreadComplete++;
}
}
}
}
return true;
}
####################################################################################################################################
# ARCHIVE_PUSH
####################################################################################################################################
@ -143,7 +228,7 @@ sub archive_push
sub archive_pull
{
my $strArchivePath = shift;
my $strCompressAsync = shift;
my $bCompressAsync = shift;
# Load the archive manifest - all the files that need to be pushed
my %oManifestHash = $oFile[0]->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath);
@ -175,12 +260,12 @@ sub archive_pull
&log(INFO, "archive to be copied to backup total ${lFileTotal}, size " . file_size_format($lFileSize));
# Init the thread variables
my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1);
my @oThread;
$iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1);
my $iThreadIdx = 0;
&log(DEBUG, "actual threads ${iThreadLocalMax}/${iThreadMax}");
# Distribute files among the threads
foreach my $strFile (sort @stryFile)
{
$oThreadQueue[$iThreadIdx]->enqueue($strFile);
@ -195,25 +280,9 @@ sub archive_pull
$oThread[$iThreadIdx] = threads->create(\&archive_pull_copy_thread, $iThreadIdx, $strArchivePath);
}
# Rejoin the threads
for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
$oThread[$iThreadIdx]->join();
}
# If there are errors then compress
# If compress async then go and compress all uncompressed archive files
# if ($bCompressAsync)
# {
# # Find all the archive files
# foreach my $strFile (@stryFile)
# {
# &log(DEBUG, "SHOULD BE LOGGING ${strFile}");
# }
# }
backup_thread_complete();
# Return 1 indicating that processing should continue
return $lFileTotal;
}
@ -241,6 +310,78 @@ sub archive_pull_copy_thread
}
}
sub archive_compress
{
my $strArchivePath = shift;
# Load the archive manifest - all the files that need to be pushed
my %oManifestHash = $oFile[0]->manifest_get(PATH_DB_ABSOLUTE, $strArchivePath);
# Get all the files to be compressed and calculate the total size
my @stryFile;
my $lFileSize = 0;
my $lFileTotal = 0;
foreach my $strFile (sort(keys $oManifestHash{name}))
{
if ($strFile =~ /^[0-F]{16}\/[0-F]{24}(\-[0-f]+){0,1}$/)
{
push @stryFile, $strFile;
$lFileSize += $oManifestHash{name}{"$strFile"}{size};
$lFileTotal++;
}
}
if ($lFileTotal == 0)
{
&log(ERROR, "no archive logs to be compressed");
return;
}
# Output files to be compressed
&log(INFO, "archive to be compressed total ${lFileTotal}, size " . file_size_format($lFileSize));
# Init the thread variables
$iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1);
my $iThreadIdx = 0;
# Distribute files among the threads
foreach my $strFile (sort @stryFile)
{
$oThreadQueue[$iThreadIdx]->enqueue($strFile);
$iThreadIdx = ($iThreadIdx + 1 == $iThreadLocalMax) ? 0 : $iThreadIdx + 1;
}
# End each thread queue and start the thread
for ($iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
$oThreadQueue[$iThreadIdx]->enqueue(undef);
$oThread[$iThreadIdx] = threads->create(\&archive_pull_compress_thread, $iThreadIdx, $strArchivePath);
}
# Don't die on an error because we'd still like to try transferring
backup_thread_complete(false);
}
sub archive_pull_compress_thread
{
my @args = @_;
my $iThreadIdx = $args[0];
my $strArchivePath = $args[1];
while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue())
{
&log(INFO, "thread ${iThreadIdx} compressing archive file ${strFile}");
# Compress the file
$oFile[$iThreadIdx]->file_compress(PATH_DB_ABSOLUTE, "${strArchivePath}/${strFile}");
}
}
####################################################################################################################################
# BACKUP_REGEXP_GET - Generate a regexp depending on the backups that need to be found
####################################################################################################################################
@ -801,7 +942,7 @@ sub backup_file
}
# Build the thread queues
my $iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1);
$iThreadLocalMax = thread_init(int($lFileTotal / $iThreadThreshold) + 1);
&log(DEBUG, "actual threads ${iThreadLocalMax}/${iThreadMax}");
# Initialize the thread size array
@ -877,24 +1018,7 @@ sub backup_file
$oyThreadData[$iThreadIdx]{size});
}
# Wait for all threads to complete and handle errors
my $iThreadComplete = 0;
# Rejoin the threads
while ($iThreadComplete < $iThreadLocalMax)
{
sleep(1);
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
if ($oThread[$iThreadIdx]->is_joinable())
{
$oThread[$iThreadIdx]->join();
undef($oThread[$iThreadIdx]);
$iThreadComplete++;
}
}
}
backup_thread_complete();
# # Look for errors
# for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
@ -914,9 +1038,11 @@ sub backup_file_thread
my $bNoChecksum = $args[1];
my $bPathCreate = $args[2];
my $lSizeTotal = $args[3];
my $lSize = 0;
$SIG{'KILL'} = sub {threads->exit();};
while (my $strFile = $oThreadQueue[$iThreadIdx]->dequeue())
{
&log(INFO, "thread ${iThreadIdx} backing up file $oFileCopyMap{$strFile}{db_file} (" .
@ -1295,7 +1421,7 @@ sub backup_expire
if (!defined($strArchiveLast))
{
&log(INFO, "invalid archive location retrieved ${$strArchiveRetentionBackup}");
confess &log(INFO, "invalid archive location retrieved ${strArchiveRetentionBackup}");
}
&log(INFO, "archive retention starts at " . $strArchiveLast);

View File

@ -528,11 +528,11 @@ sub file_copy
# Generate the command string depending on compression/decompression/cat
my $strCommand = $self->{strCommandCat};
if ($bAlreadyCompressed && $bCompress)
{
$strDestination .= $strDestination =~ "^.*\.$self->{strCompressExtension}\$" ? ".gz" : "";
}
elsif (!$bAlreadyCompressed && $bCompress)
# if ($bAlreadyCompressed && $bCompress)
# {
# $strDestination .= $strDestination =~ "^.*\.$self->{strCompressExtension}\$" ? ".gz" : "";
# }
if (!$bAlreadyCompressed && $bCompress)
{
$strCommand = $self->{strCommandCompress};
$strDestination .= ".gz";
@ -680,6 +680,35 @@ sub file_hash_get
return trim(capture($strCommand)) or confess &log(ERROR, "unable to checksum ${strPath}");
}
####################################################################################################################################
# FILE_COMPRESS
####################################################################################################################################
sub file_compress
{
my $self = shift;
my $strPathType = shift;
my $strFile = shift;
# For now this operation is not supported remotely. Not currently needed.
if ($self->is_remote($strPathType))
{
confess &log(ASSERT, "remote operation not supported");
}
if (!defined($self->{strCommandCompress}))
{
confess &log(ASSERT, "\$strCommandChecksum not defined");
}
my $strPath = $self->path_get($strPathType, $strFile);
# Build the command
my $strCommand = $self->{strCommandCompress};
$strCommand =~ s/\%file\%/${strPath}/g;
$strCommand =~ s/\ \-\-stdout//g;
system($strCommand) == 0 or confess &log(ERROR, "unable to compress ${strPath}: ${strCommand}");
}
####################################################################################################################################
# FILE_LIST_GET