mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-01-20 04:59:25 +02:00
Revert "Abortive attempt at cleaning up some thread issues - I realized the issue is in mixing threads and objects too liberally. Trying another approach but want to keep this code for historical and reference purposes."
This reverts commit e95631f82ac8c15cb2492bb321703797be54eff6.
This commit is contained in:
parent
e95631f82a
commit
50e015a838
@ -75,7 +75,6 @@ pg_backrest.pl [options] [operation]
|
||||
|
||||
=cut
|
||||
|
||||
|
||||
####################################################################################################################################
|
||||
# Global variables
|
||||
####################################################################################################################################
|
||||
@ -125,14 +124,11 @@ sub remote_exit
|
||||
|
||||
if (defined($oRemote))
|
||||
{
|
||||
$oRemote = undef;
|
||||
$oRemote->thread_kill()
|
||||
}
|
||||
|
||||
BackRest::ThreadGroup::print();
|
||||
|
||||
if (defined($iExitCode))
|
||||
{
|
||||
&log(DEBUG, "process exited with code ${iExitCode}");
|
||||
exit $iExitCode;
|
||||
}
|
||||
}
|
||||
@ -265,8 +261,6 @@ if (operation_get() eq OP_ARCHIVE_PUSH)
|
||||
{
|
||||
&log(INFO, 'No fork on archive local for TESTING');
|
||||
}
|
||||
|
||||
$oFile = undef;
|
||||
}
|
||||
|
||||
# If no backup host is defined it makes no sense to run archive-push without a specified archive file so throw an error
|
||||
@ -335,8 +329,6 @@ if (operation_get() eq OP_ARCHIVE_PUSH)
|
||||
&log(DEBUG, 'no more logs to transfer - exiting');
|
||||
}
|
||||
}
|
||||
|
||||
$oFile = undef;
|
||||
#
|
||||
# };
|
||||
|
||||
@ -422,8 +414,6 @@ if (operation_get() eq OP_ARCHIVE_GET)
|
||||
|
||||
# Get the archive file
|
||||
remote_exit(archive_get(config_key_load(CONFIG_SECTION_STANZA, CONFIG_KEY_PATH), $ARGV[1], $ARGV[2]));
|
||||
|
||||
$oFile = undef;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
@ -561,7 +551,6 @@ if (operation_get() eq OP_EXPIRE)
|
||||
lock_file_remove();
|
||||
}
|
||||
|
||||
$oFile->DESTROY();
|
||||
remote_exit(0);
|
||||
};
|
||||
|
||||
|
@ -1226,10 +1226,6 @@ sub backup_file
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# Create threads to process the thread queues
|
||||
my $oThreadGroup = new BackRest::ThreadGroup('backup');
|
||||
|
||||
# End each thread queue and start the backup_file threads
|
||||
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadLocalMax; $iThreadIdx++)
|
||||
{
|
||||
@ -1243,13 +1239,12 @@ sub backup_file
|
||||
$oThreadQueue[$iThreadIdx]->enqueue(undef);
|
||||
|
||||
# Start the thread
|
||||
$oThreadGroup->add(threads->create(\&backup_file_thread, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
|
||||
$oyThreadData[$iThreadIdx]{size}));
|
||||
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, !$bNoChecksum, !$bPathCreate,
|
||||
$oyThreadData[$iThreadIdx]{size});
|
||||
}
|
||||
|
||||
# Wait for the threads to complete
|
||||
# backup_thread_complete($iThreadTimeout);
|
||||
$oThreadGroup->complete(30);
|
||||
backup_thread_complete($iThreadTimeout);
|
||||
|
||||
# Read the messages that we passed back from the threads. These should be two types:
|
||||
# 1) remove - files that were skipped because they were removed from the database during backup
|
||||
@ -1380,7 +1375,6 @@ sub backup_file_thread
|
||||
}
|
||||
|
||||
&log(DEBUG, "thread ${iThreadIdx} exiting");
|
||||
# $oFileThread = undef;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
@ -1681,11 +1675,7 @@ sub backup
|
||||
my $strFileLog = "pg_xlog/${strArchive}";
|
||||
|
||||
# Get the checksum and compare against the one already on log log file (if there is one)
|
||||
my $strChecksum = undef;
|
||||
|
||||
if (!$bNoChecksum)
|
||||
{
|
||||
$strChecksum = $oFile->hash(PATH_BACKUP_TMP, $strDestinationFile, $bCompress);
|
||||
my $strChecksum = $oFile->hash(PATH_BACKUP_TMP, $strDestinationFile, $bCompress);
|
||||
|
||||
if ($stryArchiveFile[0] =~ "^${strArchive}-[0-f]+(\\.$oFile->{strCompressExtension}){0,1}\$" &&
|
||||
$stryArchiveFile[0] !~ "^${strArchive}-${strChecksum}(\\.$oFile->{strCompressExtension}){0,1}\$")
|
||||
@ -1693,7 +1683,6 @@ sub backup
|
||||
confess &log(ERROR, "error copying log '$stryArchiveFile[0]' to backup - checksum recored with file does " .
|
||||
"not match actual checksum of '${strChecksum}'", ERROR_CHECKSUM);
|
||||
}
|
||||
}
|
||||
|
||||
# Set manifest values
|
||||
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_USER,
|
||||
@ -1703,13 +1692,9 @@ sub backup
|
||||
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_MODE, '0700');
|
||||
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_MODIFICATION_TIME, $lModificationTime);
|
||||
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_SIZE, 16777216);
|
||||
|
||||
if (defined($strChecksum))
|
||||
{
|
||||
$oBackupManifest->set($strFileSection, $strFileLog, MANIFEST_SUBKEY_CHECKSUM, $strChecksum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Create the path for the new backup
|
||||
my $strBackupPath;
|
||||
|
@ -113,14 +113,10 @@ sub new
|
||||
my $strDefaultFilePermission = shift;
|
||||
my $iThreadIdx = shift;
|
||||
|
||||
&log(TRACE, 'BackRest::File create' . (defined($iThreadIdx) ? " 'thread ${iThreadIdx}'" : ''));
|
||||
|
||||
# Create the class hash
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
$self->{iThreadId} = threads->tid();
|
||||
|
||||
# Default compression extension to gz
|
||||
$self->{strCompressExtension} = 'gz';
|
||||
|
||||
@ -158,26 +154,19 @@ sub new
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub DESTROY
|
||||
sub DEMOLISH
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# if ($self->{iThreadId} != threads->tid())
|
||||
# {
|
||||
# return;
|
||||
# }
|
||||
if (defined($self->{oRemote}))
|
||||
{
|
||||
$self->{oRemote} = undef;
|
||||
}
|
||||
|
||||
&log(TRACE, 'BackRest::File destroy' . (defined($self->{iThreadIdx}) ? " 'thread $self->{iThreadIdx}'" : ''));
|
||||
|
||||
# if (defined($self->{oRemote}))
|
||||
# {
|
||||
# $self->{oRemote} = undef;
|
||||
# }
|
||||
#
|
||||
# if (defined($self->{oProcessAsync}))
|
||||
# {
|
||||
# $self->{oProcessAsync} = undef;
|
||||
# }
|
||||
if (defined($self->{oProcessAsync}))
|
||||
{
|
||||
$self->{oProcessAsync} = undef;
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
|
@ -16,7 +16,6 @@ use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
|
||||
|
||||
use lib dirname($0) . '/../lib';
|
||||
use BackRest::Utility;
|
||||
use BackRest::ThreadGroup;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
@ -29,16 +28,10 @@ sub new
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
&log(TRACE, ref($self) . " create [$self]");
|
||||
|
||||
$self->{iThreadId} = threads->tid();
|
||||
|
||||
# Initialize thread and queues
|
||||
$self->{oThreadQueue} = Thread::Queue->new();
|
||||
$self->{oThreadResult} = Thread::Queue->new();
|
||||
|
||||
$self->{oThreadGroup} = new BackRest::ThreadGroup('process_async');
|
||||
$self->{oThreadGroup}->add(threads->create(\&process_thread, $self));
|
||||
$self->{oThread} = threads->create(\&process_thread, $self);
|
||||
|
||||
return $self;
|
||||
}
|
||||
@ -46,18 +39,17 @@ sub new
|
||||
####################################################################################################################################
|
||||
# THREAD_KILL
|
||||
####################################################################################################################################
|
||||
# sub thread_kill
|
||||
# {
|
||||
# my $self = shift;
|
||||
#
|
||||
# if (defined($self->{oThreadGroup}))
|
||||
# {
|
||||
# $self->{oThreadQueue}->enqueue(undef);
|
||||
# $self->{oThreadGroup}->complete();
|
||||
# $self->{oThread}->join();
|
||||
# $self->{oThread} = undef;
|
||||
# }
|
||||
# }
|
||||
sub thread_kill
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if (defined($self->{oThread}))
|
||||
{
|
||||
$self->{oThreadQueue}->enqueue(undef);
|
||||
$self->{oThread}->join();
|
||||
$self->{oThread} = undef;
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
@ -66,19 +58,7 @@ sub DESTROY
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if ($self->{iThreadId} != threads->tid())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
&log(TRACE, ref($self) . " destroy [$self]");
|
||||
|
||||
if (defined($self->{oThreadGroup}))
|
||||
{
|
||||
$self->{oThreadQueue}->enqueue(undef);
|
||||
$self->{oThreadGroup}->complete();
|
||||
# $self->{oThreadGroup} = undef;
|
||||
}
|
||||
$self->thread_kill();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
@ -90,9 +70,6 @@ sub process_thread
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# When a KILL signal is received, immediately abort
|
||||
$SIG{'KILL'} = sub {threads->exit();};
|
||||
|
||||
while (my $strMessage = $self->{oThreadQueue}->dequeue())
|
||||
{
|
||||
my @stryMessage = split(':', $strMessage);
|
||||
|
@ -55,8 +55,6 @@ sub new
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
&log(TRACE, ref($self) . " create");
|
||||
|
||||
# Create the greeting that will be used to check versions with the remote
|
||||
$self->{strGreeting} = 'PG_BACKREST_REMOTE ' . version_get();
|
||||
|
||||
@ -113,31 +111,24 @@ sub new
|
||||
####################################################################################################################################
|
||||
# THREAD_KILL
|
||||
####################################################################################################################################
|
||||
# sub thread_kill
|
||||
# {
|
||||
# my $self = shift;
|
||||
#
|
||||
# if (defined($self->{oCompressAsync}))
|
||||
# {
|
||||
# $self->{oCompressAsync} = undef;
|
||||
# }
|
||||
# }
|
||||
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub DESTROY
|
||||
sub thread_kill
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
&log(TRACE, 'destroy BackRest::Remote');
|
||||
|
||||
if (defined($self->{oCompressAsync}))
|
||||
{
|
||||
$self->{oCompressAsync} = undef;
|
||||
}
|
||||
}
|
||||
|
||||
# $self->thread_kill();
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub DEMOLISH
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
$self->thread_kill();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
|
@ -584,7 +584,7 @@ sub restore
|
||||
}
|
||||
|
||||
# Create threads to process the thread queues
|
||||
my $oThreadGroup = new BackRest::ThreadGroup('restore');
|
||||
my $oThreadGroup = new BackRest::ThreadGroup();
|
||||
|
||||
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
|
||||
{
|
||||
|
@ -4,61 +4,28 @@
|
||||
package BackRest::ThreadGroup;
|
||||
|
||||
use threads;
|
||||
use threads::shared;
|
||||
use strict;
|
||||
use warnings;
|
||||
use Carp;
|
||||
|
||||
use File::Basename;
|
||||
use Time::HiRes qw(gettimeofday usleep);
|
||||
|
||||
use lib dirname($0) . '/../lib';
|
||||
use BackRest::Utility;
|
||||
|
||||
####################################################################################################################################
|
||||
# Shared for all objects
|
||||
####################################################################################################################################
|
||||
my %oThreadGroupHash :shared; # Stores active thread groups
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $strClass = shift; # Class name
|
||||
my $strDescription = shift; # Description of the group
|
||||
|
||||
# strDescription must be defined
|
||||
if (!defined($strDescription))
|
||||
{
|
||||
confess &log(ASSERT, 'strDescription is not defined');
|
||||
}
|
||||
|
||||
# Create the label based on time and description
|
||||
hsleep(.001);
|
||||
my $strLabel = '[' . gettimeofday() . '] ' . $strDescription;
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Create the class hash
|
||||
my $self = {};
|
||||
bless $self, $strClass;
|
||||
|
||||
&log(TRACE, ref($self) . " create [$self] '${strLabel}'");
|
||||
bless $self, $class;
|
||||
|
||||
# Initialize variables
|
||||
$self->{iThreadTotal} = 0;
|
||||
$self->{iThreadId} = threads->tid();
|
||||
|
||||
# Lock the group has so that mods are synchronized
|
||||
lock(%oThreadGroupHash);
|
||||
|
||||
# Add the time to the description and store it as the label, making sure there are no collisions
|
||||
if (defined($oThreadGroupHash{$strLabel}))
|
||||
{
|
||||
confess &log(ASSERT, "collision detected in thread group '${strLabel}'");
|
||||
}
|
||||
|
||||
$self->{strLabel} = shared_clone($strLabel);
|
||||
$oThreadGroupHash{$self->{strLabel}} = true;
|
||||
|
||||
return $self;
|
||||
}
|
||||
@ -73,8 +40,6 @@ sub add
|
||||
my $self = shift;
|
||||
my $oThread = shift;
|
||||
|
||||
&log(TRACE, ref($self) . " add [$self], thread [$oThread]");
|
||||
|
||||
$self->{oyThread}[$self->{iThreadTotal}] = $oThread;
|
||||
$self->{iThreadTotal}++;
|
||||
|
||||
@ -102,9 +67,7 @@ sub complete
|
||||
# Rejoin the threads
|
||||
while ($iThreadComplete < $self->{iThreadTotal})
|
||||
{
|
||||
# hsleep(.1);
|
||||
hsleep(1);
|
||||
&log(TRACE, 'waiting for threads to exit');
|
||||
hsleep(.1);
|
||||
|
||||
# If a timeout has been defined, make sure we have not been running longer than that
|
||||
if (defined($iTimeout))
|
||||
@ -125,7 +88,7 @@ sub complete
|
||||
{
|
||||
if (defined($self->{oyThread}[$iThreadIdx]->error()))
|
||||
{
|
||||
# $self->kill();
|
||||
$self->kill();
|
||||
|
||||
if ($bConfessOnError)
|
||||
{
|
||||
@ -177,7 +140,7 @@ sub kill
|
||||
$self->{oyThread}[$iThreadIdx]->join();
|
||||
}
|
||||
|
||||
$self->{oyThread}[$iThreadIdx] = undef;
|
||||
undef($self->{oyThread}[$iThreadIdx]);
|
||||
$iTotal++;
|
||||
}
|
||||
}
|
||||
@ -188,43 +151,11 @@ sub kill
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub DESTROY
|
||||
sub DEMOLISH
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if ($self->{iThreadId} != threads->tid())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
&log(TRACE, "BackRest::ThreadGroup destroy [$self] '$self->{strLabel}'");
|
||||
|
||||
$self->kill();
|
||||
|
||||
# Lock the group hash and delete the current group
|
||||
lock(%oThreadGroupHash);
|
||||
|
||||
if (!defined($oThreadGroupHash{$self->{strLabel}}))
|
||||
{
|
||||
confess &log(ASSERT, "BackRest::ThreadGroup [$self] '$self->{strLabel}' has already been destroyed");
|
||||
}
|
||||
|
||||
delete($oThreadGroupHash{$self->{strLabel}});
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# print
|
||||
####################################################################################################################################
|
||||
sub print
|
||||
{
|
||||
# Lock the group hash
|
||||
lock(%oThreadGroupHash);
|
||||
|
||||
# print all active groups
|
||||
foreach my $strLabel (sort(keys %oThreadGroupHash))
|
||||
{
|
||||
&log(TRACE, "BackRest::ThreadGroup active '${strLabel}'");
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
||||
|
@ -180,8 +180,6 @@ sub BackRestTestCommon_ExecuteEnd
|
||||
# Check the exit status and output an error if needed
|
||||
my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8;
|
||||
|
||||
&log(TRACE, "command exited with status ${iExitStatus}");
|
||||
|
||||
if (defined($iExpectedExitStatus) && $iExitStatus == $iExpectedExitStatus)
|
||||
{
|
||||
return $iExitStatus;
|
||||
@ -521,7 +519,7 @@ sub BackRestTestCommon_ConfigCreate
|
||||
}
|
||||
|
||||
$oParamHash{'global:log'}{'level-console'} = 'error';
|
||||
$oParamHash{'global:log'}{'level-file'} = 'trace';
|
||||
$oParamHash{'global:log'}{'level-file'} = 'debug';
|
||||
|
||||
if ($strLocal eq BACKUP)
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user