mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-01-20 04:59:25 +02:00
3d0d308a98
Fixed one bug that was exposed.
166 lines
5.4 KiB
Perl
166 lines
5.4 KiB
Perl
####################################################################################################################################
|
|
# THREADGROUP MODULE
|
|
####################################################################################################################################
|
|
package BackRest::ThreadGroup;
|
|
|
|
use threads;
|
|
use strict;
|
|
use warnings FATAL => qw(all);
|
|
use Carp qw(confess);
|
|
|
|
use File::Basename;
|
|
|
|
use lib dirname($0) . '/../lib';
|
|
use BackRest::Utility;
|
|
|
|
####################################################################################################################################
|
|
# MODULE EXPORTS
|
|
####################################################################################################################################
|
|
use Exporter qw(import);
|
|
|
|
our @EXPORT = qw(thread_group_create thread_group_add thread_group_complete);
|
|
|
|
####################################################################################################################################
|
|
# CONSTRUCTOR
|
|
####################################################################################################################################
|
|
sub thread_group_create
|
|
{
|
|
# Create the class hash
|
|
my $self = {};
|
|
|
|
# Initialize variables
|
|
$self->{iThreadTotal} = 0;
|
|
|
|
return $self;
|
|
}
|
|
|
|
####################################################################################################################################
|
|
# ADD
|
|
#
|
|
# Add a thread to the group. Once a thread is added, it can be tracked as part of the group.
|
|
####################################################################################################################################
|
|
sub thread_group_add
|
|
{
|
|
my $self = shift;
|
|
my $oThread = shift;
|
|
|
|
$self->{oyThread}[$self->{iThreadTotal}] = $oThread;
|
|
$self->{iThreadTotal}++;
|
|
|
|
return $self->{iThreadTotal} - 1;
|
|
}
|
|
|
|
####################################################################################################################################
|
|
# COMPLETE
|
|
#
|
|
# Wait for threads to complete.
|
|
####################################################################################################################################
|
|
sub thread_group_complete
|
|
{
|
|
my $self = shift;
|
|
my $iTimeout = shift;
|
|
my $bConfessOnError = shift;
|
|
|
|
# Set defaults
|
|
$bConfessOnError = defined($bConfessOnError) ? $bConfessOnError : true;
|
|
|
|
# Wait for all threads to complete and handle errors
|
|
my $iThreadComplete = 0;
|
|
my $lTimeBegin = time();
|
|
|
|
# Rejoin the threads
|
|
while ($iThreadComplete < $self->{iThreadTotal})
|
|
{
|
|
hsleep(.1);
|
|
|
|
# If a timeout has been defined, make sure we have not been running longer than that
|
|
if (defined($iTimeout))
|
|
{
|
|
if (time() - $lTimeBegin >= $iTimeout)
|
|
{
|
|
confess &log(ERROR, "threads have been running more than ${iTimeout} seconds, exiting...");
|
|
|
|
#backup_thread_kill();
|
|
|
|
#confess &log(WARN, "all threads have exited, aborting...");
|
|
}
|
|
}
|
|
|
|
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
|
|
{
|
|
if (defined($self->{oyThread}[$iThreadIdx]))
|
|
{
|
|
if (defined($self->{oyThread}[$iThreadIdx]->error()))
|
|
{
|
|
$self->kill();
|
|
|
|
if ($bConfessOnError)
|
|
{
|
|
confess &log(ERROR, 'error in thread ' . (${iThreadIdx} + 1) . ': check log for details');
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ($self->{oyThread}[$iThreadIdx]->is_joinable())
|
|
{
|
|
&log(DEBUG, "thread ${iThreadIdx} exited");
|
|
$self->{oyThread}[$iThreadIdx]->join();
|
|
&log(TRACE, "thread ${iThreadIdx} object undef");
|
|
undef($self->{oyThread}[$iThreadIdx]);
|
|
$iThreadComplete++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
&log(DEBUG, 'all threads exited');
|
|
|
|
return true;
|
|
}
|
|
|
|
####################################################################################################################################
|
|
# KILL
|
|
####################################################################################################################################
|
|
sub thread_group_destroy
|
|
{
|
|
my $self = shift;
|
|
|
|
# Total number of threads killed
|
|
my $iTotal = 0;
|
|
|
|
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
|
|
{
|
|
if (defined($self->{oyThread}[$iThreadIdx]))
|
|
{
|
|
if ($self->{oyThread}[$iThreadIdx]->is_running())
|
|
{
|
|
$self->{oyThread}[$iThreadIdx]->kill('KILL')->join();
|
|
}
|
|
elsif ($self->{oyThread}[$iThreadIdx]->is_joinable())
|
|
{
|
|
$self->{oyThread}[$iThreadIdx]->join();
|
|
}
|
|
|
|
undef($self->{oyThread}[$iThreadIdx]);
|
|
$iTotal++;
|
|
}
|
|
}
|
|
|
|
return($iTotal);
|
|
}
|
|
|
|
####################################################################################################################################
|
|
# DESTRUCTOR
|
|
####################################################################################################################################
|
|
# sub thread_group_destroy
|
|
# {
|
|
# my $self = shift;
|
|
#
|
|
# $self->kill();
|
|
# }
|
|
|
|
1;
|