1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Abortive attempt at cleaning up some thread issues - I realized the issue is in mixing threads and objects too liberally. Trying another approach but want to keep this code for historical and reference purposes.

This commit is contained in:
David Steele 2015-01-30 14:55:55 -05:00
parent fb934ecce9
commit e95631f82a
8 changed files with 190 additions and 50 deletions

View File

@ -75,6 +75,7 @@ pg_backrest.pl [options] [operation]
=cut
####################################################################################################################################
# Global variables
####################################################################################################################################
@ -124,11 +125,14 @@ sub remote_exit
if (defined($oRemote))
{
$oRemote->thread_kill()
$oRemote = undef;
}
BackRest::ThreadGroup::print();
if (defined($iExitCode))
{
&log(DEBUG, "process exited with code ${iExitCode}");
exit $iExitCode;
}
}
@ -261,6 +265,8 @@ if (operation_get() eq OP_ARCHIVE_PUSH)
{
&log(INFO, 'No fork on archive local for TESTING');
}
$oFile = undef;
}
# If no backup host is defined it makes no sense to run archive-push without a specified archive file so throw an error
@ -329,6 +335,8 @@ if (operation_get() eq OP_ARCHIVE_PUSH)
&log(DEBUG, 'no more logs to transfer - exiting');
}
}
$oFile = undef;
#
# };
@ -414,6 +422,8 @@ if (operation_get() eq OP_ARCHIVE_GET)
# Get the archive file
remote_exit(archive_get(config_key_load(CONFIG_SECTION_STANZA, CONFIG_KEY_PATH), $ARGV[1], $ARGV[2]));
$oFile = undef;
}
####################################################################################################################################
@ -551,6 +561,7 @@ if (operation_get() eq OP_EXPIRE)
lock_file_remove();
}
$oFile->DESTROY();
remote_exit(0);
};

View File

@ -1226,6 +1226,10 @@ sub backup_file
}
}
# Create threads to process the thread queues
my $oThreadGroup = new BackRest::ThreadGroup('backup');
# End each thread queue and start the backup_file threads
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
{
@ -1239,12 +1243,13 @@ sub backup_file
$oThreadQueue[$iThreadIdx]->enqueue(undef);
# Start the thread
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
$oyThreadData[$iThreadIdx]{size});
$oThreadGroup->add(threads->create(\&backup_file_thread, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
$oyThreadData[$iThreadIdx]{size}));
}
# Wait for the threads to complete
backup_thread_complete($iThreadTimeout);
# backup_thread_complete($iThreadTimeout);
$oThreadGroup->complete(30);
# Read the messages that we passed back from the threads. These should be two types:
# 1) remove - files that were skipped because they were removed from the database during backup
@ -1375,6 +1380,7 @@ sub backup_file_thread
}
&log(DEBUG, "thread ${iThreadIdx} exiting");
# $oFileThread = undef;
}
####################################################################################################################################
@ -1675,13 +1681,18 @@ sub backup
my $strFileLog = "pg_xlog/${strArchive}";
# Get the checksum and compare against the one already on log log file (if there is one)
my $strChecksum = $oFile->hash(PATH_BACKUP_TMP, $strDestinationFile, $bCompress);
my $strChecksum = undef;
if ($stryArchiveFile[0] =~ "^${strArchive}-[0-f]+(\\.$oFile->{strCompressExtension}){0,1}\$" &&
$stryArchiveFile[0] !~ "^${strArchive}-${strChecksum}(\\.$oFile->{strCompressExtension}){0,1}\$")
if (!$bNoChecksum)
{
confess &log(ERROR, "error copying log '$stryArchiveFile[0]' to backup - checksum recored with file does " .
"not match actual checksum of '${strChecksum}'", ERROR_CHECKSUM);
$strChecksum = $oFile->hash(PATH_BACKUP_TMP, $strDestinationFile, $bCompress);
if ($stryArchiveFile[0] =~ "^${strArchive}-[0-f]+(\\.$oFile->{strCompressExtension}){0,1}\$" &&
$stryArchiveFile[0] !~ "^${strArchive}-${strChecksum}(\\.$oFile->{strCompressExtension}){0,1}\$")
{
confess &log(ERROR, "error copying log '$stryArchiveFile[0]' to backup - checksum recored with file does " .
"not match actual checksum of '${strChecksum}'", ERROR_CHECKSUM);
}
}
# Set manifest values
@ -1692,7 +1703,11 @@ sub backup
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_MODE, '0700');
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_MODIFICATION_TIME, $lModificationTime);
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_SIZE, 16777216);
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
if (defined($strChecksum))
{
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
}
}
}

View File

@ -113,10 +113,14 @@ sub new
my $strDefaultFilePermission = shift;
my $iThreadIdx = shift;
&log(TRACE, 'BackRest::File create' . (defined($iThreadIdx) ? " 'thread ${iThreadIdx}'" : ''));
# Create the class hash
my $self = {};
bless $self, $class;
$self->{iThreadId} = threads->tid();
# Default compression extension to gz
$self->{strCompressExtension} = 'gz';
@ -154,19 +158,26 @@ sub new
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DEMOLISH
sub DESTROY
{
my $self = shift;
if (defined($self->{oRemote}))
{
$self->{oRemote} = undef;
}
# if ($self->{iThreadId} != threads->tid())
# {
# return;
# }
if (defined($self->{oProcessAsync}))
{
$self->{oProcessAsync} = undef;
}
&log(TRACE, 'BackRest::File destroy' . (defined($self->{iThreadIdx}) ? " 'thread $self->{iThreadIdx}'" : ''));
# if (defined($self->{oRemote}))
# {
# $self->{oRemote} = undef;
# }
#
# if (defined($self->{oProcessAsync}))
# {
# $self->{oProcessAsync} = undef;
# }
}
####################################################################################################################################

View File

@ -16,6 +16,7 @@ use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use lib dirname($0) . '/../lib';
use BackRest::Utility;
use BackRest::ThreadGroup;
####################################################################################################################################
# CONSTRUCTOR
@ -28,10 +29,16 @@ sub new
my $self = {};
bless $self, $class;
&log(TRACE, ref($self) . " create [$self]");
$self->{iThreadId} = threads->tid();
# Initialize thread and queues
$self->{oThreadQueue} = Thread::Queue->new();
$self->{oThreadResult} = Thread::Queue->new();
$self->{oThread} = threads->create(\&process_thread, $self);
$self->{oThreadGroup} = new BackRest::ThreadGroup('process_async');
$self->{oThreadGroup}->add(threads->create(\&process_thread, $self));
return $self;
}
@ -39,17 +46,18 @@ sub new
####################################################################################################################################
# THREAD_KILL
####################################################################################################################################
sub thread_kill
{
my $self = shift;
if (defined($self->{oThread}))
{
$self->{oThreadQueue}->enqueue(undef);
$self->{oThread}->join();
$self->{oThread} = undef;
}
}
# sub thread_kill
# {
# my $self = shift;
#
# if (defined($self->{oThreadGroup}))
# {
# $self->{oThreadQueue}->enqueue(undef);
# $self->{oThreadGroup}->complete();
# $self->{oThread}->join();
# $self->{oThread} = undef;
# }
# }
####################################################################################################################################
# DESTRUCTOR
@ -58,7 +66,19 @@ sub DESTROY
{
my $self = shift;
$self->thread_kill();
if ($self->{iThreadId} != threads->tid())
{
return;
}
&log(TRACE, ref($self) . " destroy [$self]");
if (defined($self->{oThreadGroup}))
{
$self->{oThreadQueue}->enqueue(undef);
$self->{oThreadGroup}->complete();
# $self->{oThreadGroup} = undef;
}
}
####################################################################################################################################
@ -70,6 +90,9 @@ sub process_thread
{
my $self = shift;
# When a KILL signal is received, immediately abort
$SIG{'KILL'} = sub {threads->exit();};
while (my $strMessage = $self->{oThreadQueue}->dequeue())
{
my @stryMessage = split(':', $strMessage);

View File

@ -55,6 +55,8 @@ sub new
my $self = {};
bless $self, $class;
&log(TRACE, ref($self) . " create");
# Create the greeting that will be used to check versions with the remote
$self->{strGreeting} = 'PG_BACKREST_REMOTE ' . version_get();
@ -111,24 +113,31 @@ sub new
####################################################################################################################################
# THREAD_KILL
####################################################################################################################################
sub thread_kill
# sub thread_kill
# {
# my $self = shift;
#
# if (defined($self->{oCompressAsync}))
# {
# $self->{oCompressAsync} = undef;
# }
# }
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DESTROY
{
my $self = shift;
&log(TRACE, 'destroy BackRest::Remote');
if (defined($self->{oCompressAsync}))
{
$self->{oCompressAsync} = undef;
}
}
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DEMOLISH
{
my $self = shift;
$self->thread_kill();
# $self->thread_kill();
}
####################################################################################################################################

View File

@ -584,7 +584,7 @@ sub restore
}
# Create threads to process the thread queues
my $oThreadGroup = new BackRest::ThreadGroup();
my $oThreadGroup = new BackRest::ThreadGroup('restore');
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
{

View File

@ -4,28 +4,61 @@
package BackRest::ThreadGroup;
use threads;
use threads::shared;
use strict;
use warnings;
use Carp;
use File::Basename;
use Time::HiRes qw(gettimeofday usleep);
use lib dirname($0) . '/../lib';
use BackRest::Utility;
####################################################################################################################################
# Shared for all objects
####################################################################################################################################
my %oThreadGroupHash :shared; # Stores active thread groups
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub new
{
my $class = shift; # Class name
my $strClass = shift; # Class name
my $strDescription = shift; # Description of the group
# strDescription must be defined
if (!defined($strDescription))
{
confess &log(ASSERT, 'strDescription is not defined');
}
# Create the label based on time and description
hsleep(.001);
my $strLabel = '[' . gettimeofday() . '] ' . $strDescription;
# Create the class hash
my $self = {};
bless $self, $class;
bless $self, $strClass;
&log(TRACE, ref($self) . " create [$self] '${strLabel}'");
# Initialize variables
$self->{iThreadTotal} = 0;
$self->{iThreadId} = threads->tid();
# Lock the group has so that mods are synchronized
lock(%oThreadGroupHash);
# Add the time to the description and store it as the label, making sure there are no collisions
if (defined($oThreadGroupHash{$strLabel}))
{
confess &log(ASSERT, "collision detected in thread group '${strLabel}'");
}
$self->{strLabel} = shared_clone($strLabel);
$oThreadGroupHash{$self->{strLabel}} = true;
return $self;
}
@ -40,6 +73,8 @@ sub add
my $self = shift;
my $oThread = shift;
&log(TRACE, ref($self) . " add [$self], thread [$oThread]");
$self->{oyThread}[$self->{iThreadTotal}] = $oThread;
$self->{iThreadTotal}++;
@ -67,7 +102,9 @@ sub complete
# Rejoin the threads
while ($iThreadComplete < $self->{iThreadTotal})
{
hsleep(.1);
# hsleep(.1);
hsleep(1);
&log(TRACE, 'waiting for threads to exit');
# If a timeout has been defined, make sure we have not been running longer than that
if (defined($iTimeout))
@ -88,7 +125,7 @@ sub complete
{
if (defined($self->{oyThread}[$iThreadIdx]->error()))
{
$self->kill();
# $self->kill();
if ($bConfessOnError)
{
@ -140,7 +177,7 @@ sub kill
$self->{oyThread}[$iThreadIdx]->join();
}
undef($self->{oyThread}[$iThreadIdx]);
$self->{oyThread}[$iThreadIdx] = undef;
$iTotal++;
}
}
@ -151,11 +188,43 @@ sub kill
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DEMOLISH
sub DESTROY
{
my $self = shift;
if ($self->{iThreadId} != threads->tid())
{
return;
}
&log(TRACE, "BackRest::ThreadGroup destroy [$self] '$self->{strLabel}'");
$self->kill();
# Lock the group hash and delete the current group
lock(%oThreadGroupHash);
if (!defined($oThreadGroupHash{$self->{strLabel}}))
{
confess &log(ASSERT, "BackRest::ThreadGroup [$self] '$self->{strLabel}' has already been destroyed");
}
delete($oThreadGroupHash{$self->{strLabel}});
}
####################################################################################################################################
# print
####################################################################################################################################
sub print
{
# Lock the group hash
lock(%oThreadGroupHash);
# print all active groups
foreach my $strLabel (sort(keys %oThreadGroupHash))
{
&log(TRACE, "BackRest::ThreadGroup active '${strLabel}'");
}
}
1;

View File

@ -180,6 +180,8 @@ sub BackRestTestCommon_ExecuteEnd
# Check the exit status and output an error if needed
my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8;
&log(TRACE, "command exited with status ${iExitStatus}");
if (defined($iExpectedExitStatus) && $iExitStatus == $iExpectedExitStatus)
{
return $iExitStatus;
@ -519,7 +521,7 @@ sub BackRestTestCommon_ConfigCreate
}
$oParamHash{'global:log'}{'level-console'} = 'error';
$oParamHash{'global:log'}{'level-file'} = 'debug';
$oParamHash{'global:log'}{'level-file'} = 'trace';
if ($strLocal eq BACKUP)
{