1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-10-30 23:37:45 +02:00

Working on removing threads from instream compress/decompression.

This commit is contained in:
David Steele
2015-02-26 09:22:05 -05:00
parent d2602a5c07
commit b9378d94a2
5 changed files with 79 additions and 301 deletions

View File

@@ -23,7 +23,6 @@ use lib dirname($0) . '/../lib';
use BackRest::Exception;
use BackRest::Utility;
use BackRest::Remote;
use BackRest::ProcessAsync;
# Exports
use Exporter qw(import);
@@ -162,11 +161,6 @@ sub DEMOLISH
{
$self->{oRemote} = undef;
}
if (defined($self->{oProcessAsync}))
{
$self->{oProcessAsync} = undef;
}
}
####################################################################################################################################
@@ -189,21 +183,6 @@ sub clone
);
}
####################################################################################################################################
# PROCESS_ASYNC_GET
####################################################################################################################################
sub process_async_get
{
my $self = shift;
if (!defined($self->{oProcessAsync}))
{
$self->{oProcessAsync} = new BackRest::ProcessAsync;
}
return $self->{oProcessAsync};
}
####################################################################################################################################
# PATH_TYPE_GET
####################################################################################################################################
@@ -825,8 +804,7 @@ sub hash
if ($bCompressed)
{
$oSHA->addfile($self->process_async_get()->process_begin('decompress', $hFile));
$self->process_async_get()->process_end();
confess "CANNOT DECOMPRESS WITH MISSING REMOTE";
}
else
{

View File

@@ -1,222 +0,0 @@
####################################################################################################################################
# PROCESS ASYNC MODULE
####################################################################################################################################
package BackRest::ProcessAsync;
use threads;
use strict;
use warnings;
use Carp;
use Thread::Queue;
use File::Basename;
use IO::Handle;
use IO::Compress::Gzip qw(gzip $GzipError);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use lib dirname($0) . '/../lib';
use BackRest::Utility;
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub new
{
my $class = shift;
# Create the class hash
my $self = {};
bless $self, $class;
# Initialize thread and queues
$self->{oThreadQueue} = Thread::Queue->new();
$self->{oThreadResult} = Thread::Queue->new();
$self->{oThread} = threads->create(\&process_thread, $self);
return $self;
}
####################################################################################################################################
# THREAD_KILL
####################################################################################################################################
sub thread_kill
{
my $self = shift;
if (defined($self->{oThread}))
{
$self->{oThreadQueue}->enqueue(undef);
$self->{oThread}->join();
$self->{oThread} = undef;
}
}
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DESTROY
{
my $self = shift;
$self->thread_kill();
}
####################################################################################################################################
# PROCESS_THREAD
#
# De/Compresses/Checksum data on a thread.
####################################################################################################################################
sub process_thread
{
my $self = shift;
# When a KILL signal is received, immediately abort
$SIG{'KILL'} = sub {threads->exit();};
while (my $strMessage = $self->{oThreadQueue}->dequeue())
{
my @stryMessage = split(':', $strMessage);
my @strHandle = split(',', $stryMessage[1]);
my $hIn = IO::Handle->new_from_fd($strHandle[0], '<');
my $hOut = IO::Handle->new_from_fd($strHandle[1], '>');
$self->{oThreadResult}->enqueue('running');
if ($stryMessage[0] eq 'compress')
{
gzip($hIn => $hOut)
or confess &log(ERROR, 'unable to compress: ' . $GzipError);
}
elsif ($stryMessage[0] eq 'decompress')
{
gunzip($hIn => $hOut)
or confess &log(ERROR, 'unable to decompress: ' . $GunzipError);
}
close($hOut);
$self->{oThreadResult}->enqueue('complete');
}
}
####################################################################################################################################
# PROCESS_BEGIN
#
# Begins the de/compress/checksum operation
####################################################################################################################################
sub process_begin
{
my $self = shift;
my $strProcess = shift; # process to run (compress, decompress, checksum, size)
my $hHandle = shift; # Handle of the input or output
my $strDirection = shift; # Does hHandle represent in or out?
# Set/check value of strDirection
if (!defined($strDirection))
{
$strDirection = 'in';
}
else
{
if ($strDirection ne 'in' && $strDirection ne 'out')
{
confess &log(ASSERT, 'strDirection must be in (in, out) in ProcessAsync->process_begin()');
}
}
# Check if thread is already running
if (defined($self->{hPipeOut}))
{
confess &log(ASSERT, 'thread is already running in ProcessAsync->process_begin()');
}
# Validate strProcess
if (!defined($strProcess))
{
confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()');
}
if ($strProcess ne 'compress' && $strProcess ne 'decompress' && $strProcess ne 'checksum')
{
confess &log(ASSERT, 'strProcess must be (compress, decompress, checksum) in call to ProcessAsync->process_begin():' .
" ${strProcess} was passed");
}
# Validate hIn
if (!defined($hHandle))
{
confess &log(ASSERT, 'hHandle must be defined in call to ProcessAsync->process_begin()');
}
# Open the out/in pipes
my $hPipeOut;
my $hPipeIn;
pipe $hPipeOut, $hPipeIn;
# Queue the job in the thread
if ($strDirection eq 'in')
{
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hHandle) . ',' . fileno($hPipeIn));
}
else
{
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hPipeOut) . ',' . fileno($hHandle));
}
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue();
# Close input pipe so that thread has the only copy
if ($strMessage eq 'running')
{
if ($strDirection eq 'in')
{
close($hPipeIn);
$self->{hPipe} = $hPipeOut;
}
else
{
close($hPipeOut);
$self->{hPipe} = $hPipeIn;
}
}
# If any other message is returned then error
else
{
confess "unknown thread message while waiting for running: ${strMessage}";
}
return $self->{hPipe};
}
####################################################################################################################################
# PROCESS_END
#
# Ends the de/compress/checksum operation
####################################################################################################################################
sub process_end
{
my $self = shift;
# Check if thread is not running
if (!defined($self->{hPipe}))
{
confess &log(ASSERT, 'thread is not running in ProcessAsync->process_end()');
}
# Make sure the de/compress pipes are closed
close($self->{hPipe});
$self->{hPipe} = undef;
# Wait for the thread to acknowledge that it has completed
my $strMessage = $self->{oThreadResult}->dequeue();
if ($strMessage ne 'complete')
{
confess "unknown thread message while waiting for complete: ${strMessage}";
}
}
1;

View File

@@ -13,11 +13,11 @@ use Net::OpenSSH;
use File::Basename;
use POSIX ':sys_wait_h';
use Scalar::Util 'blessed';
use IO::Compress::Gzip qw(gzip $GzipError);
use lib dirname($0) . '/../lib';
use BackRest::Exception;
use BackRest::Utility;
use BackRest::ProcessAsync;
use Exporter qw(import);
our @EXPORT = qw(DB BACKUP NONE);
@@ -114,11 +114,6 @@ sub new
sub thread_kill
{
my $self = shift;
if (defined($self->{oCompressAsync}))
{
$self->{oCompressAsync} = undef;
}
}
####################################################################################################################################
@@ -147,21 +142,6 @@ sub clone
);
}
####################################################################################################################################
# COMPRESS_ASYNC_GET
####################################################################################################################################
sub compress_async_get
{
my $self = shift;
if (!defined($self->{oCompressAsync}))
{
$self->{oCompressAsync} = new BackRest::ProcessAsync;
}
return $self->{oCompressAsync};
}
####################################################################################################################################
# GREETING_READ
#
@@ -369,7 +349,8 @@ sub wait_pid
####################################################################################################################################
# BINARY_XFER
#
# Copies data from one file handle to another, optionally compressing or decompressing the data in stream.
# Copies data from one file handle to another, optionally compressing or decompressing the data in stream. If $strRemote != none
# then one side is a protocol stream.
####################################################################################################################################
sub binary_xfer
{
@@ -396,11 +377,13 @@ sub binary_xfer
my $iBlockSize = $self->{iBlockSize};
my $iBlockIn;
my $iBlockInTotal = $iBlockSize;
my $iBlockBufferIn;
my $iBlockOut;
my $iBlockTotal = 0;
my $strBlockHeader;
my $strBlock;
my $bThreadRunning = false;
my $strBlockBuffer;
my $oGzip;
# Both the in and out streams must be defined
if (!defined($hIn) || !defined($hOut))
@@ -411,22 +394,20 @@ sub binary_xfer
# If this is output and the source is not already compressed
if ($strRemote eq 'out' && !$bSourceCompressed)
{
$hIn = $self->compress_async_get()->process_begin('compress', $hIn);
$bThreadRunning = true;
$oGzip = new IO::Compress::Gzip(\$strBlock);
}
# Spawn a child process to do decompression
# If this is input and the destination should be uncompressed
elsif ($strRemote eq 'in' && !$bDestinationCompress)
{
$hOut = $self->compress_async_get()->process_begin('decompress', $hOut, 'out');
$bThreadRunning = true;
# Open Unzip - TBD
}
while (1)
{
# Read from the protocol stream
if ($strRemote eq 'in')
{
# Read the block header (at start or when the previous block is complete)
if ($iBlockInTotal == $iBlockSize)
{
$strBlockHeader = $self->read_line($hIn);
@@ -437,12 +418,14 @@ sub binary_xfer
confess "unable to read block header ${strBlockHeader}";
}
$iBlockInTotal = 0;
# Parse the block size
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
$iBlockTotal += 1;
$iBlockInTotal = 0;
}
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
# If the block size > 0 then read from protocol stream. The entire block may not have been read before so keep reading
# until we get it all.
if ($iBlockSize != 0)
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal);
@@ -458,22 +441,58 @@ sub binary_xfer
$iBlockInTotal += $iBlockIn;
}
# Else indicate the stream is complete
else
{
$iBlockIn = 0;
}
}
# Read from file input stream
else
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize);
# Read a block from the input stream
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockIn))
if (!defined($iBlockBufferIn))
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
# Clear the compression output buffer
undef($strBlock);
# If new data was read from the input stream then compress
if ($iBlockBufferIn > 0)
{
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
}
# If there was nothing new to compress then flush
if (!defined($strBlock))
{
$oGzip->flush();
}
# If there is data in the compressed buffer, then output
if (defined($strBlock))
{
$iBlockIn = length($strBlock);
}
# Else indicate that the stream is complete
else
{
$iBlockIn = 0;
}
}
# Write block header to the protocal stream
if ($strRemote eq 'out')
{
$strBlockHeader = "block ${iBlockIn}\n";
@@ -487,6 +506,7 @@ sub binary_xfer
}
}
# Write block to output stream
if ($iBlockIn > 0)
{
$iBlockOut = syswrite($hOut, $strBlock, $iBlockIn);
@@ -497,16 +517,14 @@ sub binary_xfer
confess "unable to write ${iBlockIn} bytes" . (defined($!) ? ': ' . $! : '');
}
}
# When there is no more data to write then exit
else
{
last;
}
}
if ($bThreadRunning)
{
$self->compress_async_get()->process_end();
}
# &log(INFO, 'got to end');
}
####################################################################################################################################

View File

@@ -588,16 +588,16 @@ sub restore
}
# Create threads to process the thread queues
my $oThreadGroup = new BackRest::ThreadGroup();
my $oThreadGroup = thread_group_create();
for (my $iThreadIdx = 0; $iThreadIdx < $self->{iThreadTotal}; $iThreadIdx++)
{
&log(DEBUG, "starting restore thread ${iThreadIdx}");
$oThreadGroup->add(threads->create(\&restore_thread, $self, $iThreadIdx, \@oyRestoreQueue, $oManifest));
thread_group_add($oThreadGroup, threads->create(\&restore_thread, $self, $iThreadIdx, \@oyRestoreQueue, $oManifest));
}
# Complete thread queues
$oThreadGroup->complete();
thread_group_complete($oThreadGroup);
# Create recovery.conf file
$self->recovery();

View File

@@ -13,16 +13,20 @@ use File::Basename;
use lib dirname($0) . '/../lib';
use BackRest::Utility;
####################################################################################################################################
# MODULE EXPORTS
####################################################################################################################################
use Exporter qw(import);
our @EXPORT = qw(thread_group_create thread_group_add thread_group_complete);
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub new
sub thread_group_create
{
my $class = shift; # Class name
# Create the class hash
my $self = {};
bless $self, $class;
# Initialize variables
$self->{iThreadTotal} = 0;
@@ -35,7 +39,7 @@ sub new
#
# Add a thread to the group. Once a thread is added, it can be tracked as part of the group.
####################################################################################################################################
sub add
sub thread_group_add
{
my $self = shift;
my $oThread = shift;
@@ -51,7 +55,7 @@ sub add
#
# Wait for threads to complete.
####################################################################################################################################
sub complete
sub thread_group_complete
{
my $self = shift;
my $iTimeout = shift;
@@ -118,9 +122,9 @@ sub complete
}
####################################################################################################################################
# kill
# KILL
####################################################################################################################################
sub kill
sub thread_group_destroy
{
my $self = shift;
@@ -151,11 +155,11 @@ sub kill
####################################################################################################################################
# DESTRUCTOR
####################################################################################################################################
sub DEMOLISH
{
my $self = shift;
$self->kill();
}
# sub thread_group_destroy
# {
# my $self = shift;
#
# $self->kill();
# }
1;