From 3d86adadb5a7d3cba91e6839721681d3b1237f37 Mon Sep 17 00:00:00 2001 From: David Steele Date: Wed, 8 Oct 2014 13:54:31 -0400 Subject: [PATCH] Remote object now using ProcessAsync for de/compression. Removed dependency on Moose from Remote. --- INSTALL.md | 2 +- bin/pg_backrest.pl | 11 +- lib/BackRest/ProcessAsync.pm | 64 ++++++--- lib/BackRest/Remote.pm | 214 ++++++---------------------- test/lib/BackRestTest/BackupTest.pm | 6 +- test/lib/BackRestTest/CommonTest.pm | 19 +-- test/lib/BackRestTest/FileTest.pm | 8 +- 7 files changed, 117 insertions(+), 207 deletions(-) diff --git a/INSTALL.md b/INSTALL.md index f72e53ff1..b4eed6517 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -95,7 +95,7 @@ path=/var/lib/postgresql/9.3/main #### simple multiple host install -This configuration is appropriate for a small installation where backups are being made remotely. Make sure that postgres@db-host has trusted ssh to backrest@backup-host and vice versa. +This configuration is appropriate for a small installation where backups are being made remotely. Make sure that postgres@db-host has trusted ssh to backrest@backup-host and vice versa. This configuration assumes that you have pg_backrest_remote.pl and pg_backrest.pl in the same path on both servers. `/etc/pg_backrest.conf on the db host`: ``` diff --git a/bin/pg_backrest.pl b/bin/pg_backrest.pl index baa18551e..5e36d1daa 100755 --- a/bin/pg_backrest.pl +++ b/bin/pg_backrest.pl @@ -244,14 +244,11 @@ sub remote_get() { if (!defined($oRemote) && $strRemote ne REMOTE_NONE) { - $oRemote = BackRest::Remote->new + $oRemote = new BackRest::Remote ( - strHost => - config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST, true), - strUser => - config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_USER, true), - strCommand => - config_key_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_REMOTE, true) + config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST, true), + config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_USER, true), + config_key_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_REMOTE, true) ); } diff --git a/lib/BackRest/ProcessAsync.pm b/lib/BackRest/ProcessAsync.pm index 9f3ea2bf7..da5392c91 100644 --- a/lib/BackRest/ProcessAsync.pm +++ b/lib/BackRest/ProcessAsync.pm @@ -17,22 +17,21 @@ use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use lib dirname($0) . '/../lib'; use BackRest::Utility; -#use Exporter qw(import); -#our @EXPORT = qw(new DESTROY thread_kill process_begin process_end); - #################################################################################################################################### # CONSTRUCTOR #################################################################################################################################### sub new { my $class = shift; + + # Create the class hash my $self = {}; + bless $self, $class; $self->{oThreadQueue} = Thread::Queue->new(); $self->{oThreadResult} = Thread::Queue->new(); $self->{oThread} = threads->create(\&process_thread, $self); - bless $self, $class; return $self; } @@ -105,8 +104,22 @@ sub process_thread sub process_begin { my $self = shift; - my $strProcess = shift; - my $hIn = shift; + my $strProcess = shift; # process to run (compress, decompress, checksum, size) + my $hHandle = shift; # Handle of the input or output + my $strDirection = shift; # Does hHandle represent in or out? + + # Set/check value of strDirection + if (!defined($strDirection)) + { + $strDirection = 'in'; + } + else + { + if ($strDirection ne 'in' && $strDirection ne 'out') + { + confess &log(ASSERT, 'strDirection must be in (in, out) in ProcessAsync->process_begin()'); + } + } # Check if thread is already running if (defined($self->{hPipeOut})) @@ -127,18 +140,26 @@ sub process_begin } # Validate hIn - if (!defined($hIn)) + if (!defined($hHandle)) { - confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()'); + confess &log(ASSERT, 'hHandle must be defined in call to ProcessAsync->process_begin()'); } - # Open the in/out pipes + # Open the out/in pipes + my $hPipeOut; my $hPipeIn; - pipe $self->{hPipeOut}, $hPipeIn; + pipe $hPipeOut, $hPipeIn; - # Queue the queue job with the thread - $self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hIn) . ',' . fileno($hPipeIn)); + # Queue the job in the thread + if ($strDirection eq 'in') + { + $self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hHandle) . ',' . fileno($hPipeIn)); + } + else + { + $self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hPipeOut) . ',' . fileno($hHandle)); + } # Wait for the thread to acknowledge that it has duplicated the file handles my $strMessage = $self->{oThreadResult}->dequeue(); @@ -146,7 +167,16 @@ sub process_begin # Close input pipe so that thread has the only copy if ($strMessage eq 'running') { - close($hPipeIn); + if ($strDirection eq 'in') + { + close($hPipeIn); + $self->{hPipe} = $hPipeOut; + } + else + { + close($hPipeOut); + $self->{hPipe} = $hPipeIn; + } } # If any other message is returned then error else @@ -154,7 +184,7 @@ sub process_begin confess "unknown thread message while waiting for running: ${strMessage}"; } - return $self->{hPipeOut}; + return $self->{hPipe}; } #################################################################################################################################### @@ -167,14 +197,14 @@ sub process_end my $self = shift; # Check if thread is not running - if (!defined($self->{hPipeOut})) + if (!defined($self->{hPipe})) { confess &log(ASSERT, 'thread is not running in ProcessAsync->process_end()'); } # Make sure the de/compress pipes are closed - close($self->{hPipeOut}); - $self->{hPipeOut} = undef; + close($self->{hPipe}); + $self->{hPipe} = undef; # Wait for the thread to acknowledge that it has completed my $strMessage = $self->{oThreadResult}->dequeue(); diff --git a/lib/BackRest/Remote.pm b/lib/BackRest/Remote.pm index 01196b046..e6c378289 100644 --- a/lib/BackRest/Remote.pm +++ b/lib/BackRest/Remote.pm @@ -8,18 +8,16 @@ use strict; use warnings; use Carp; -use Moose; -use Thread::Queue; +use Scalar::Util; use Net::OpenSSH; use File::Basename; -use IO::Handle; use POSIX ':sys_wait_h'; -use IO::Compress::Gzip qw(gzip $GzipError); -use IO::Uncompress::Gunzip qw(gunzip $GunzipError); +use Scalar::Util 'blessed'; use lib dirname($0) . '/../lib'; use BackRest::Exception; use BackRest::Utility; +use BackRest::ProcessAsync; #################################################################################################################################### # Remote xfer default block size constant @@ -29,58 +27,53 @@ use constant DEFAULT_BLOCK_SIZE => 1048576 }; -#################################################################################################################################### -# Module variables -#################################################################################################################################### -# Protocol strings -has strGreeting => (is => 'ro', default => 'PG_BACKREST_REMOTE'); - -# Command strings -has strCommand => (is => 'bare'); - -# Module variables -has strHost => (is => 'bare'); # Host host -has strUser => (is => 'bare'); # User user -has oSSH => (is => 'bare'); # SSH object - -# Process variables -has pId => (is => 'bare'); # Process Id -has hIn => (is => 'bare'); # Input stream -has hOut => (is => 'bare'); # Output stream -has hErr => (is => 'bare'); # Error stream - -# Thread variables -has iThreadIdx => (is => 'bare'); # Thread index -has oThread => (is => 'bare'); # Thread object -has oThreadQueue => (is => 'bare'); # Thread queue object -has oThreadResult => (is => 'bare'); # Thread result object - -# Block size -has iBlockSize => (is => 'bare', default => DEFAULT_BLOCK_SIZE); # Set block size to default - #################################################################################################################################### # CONSTRUCTOR #################################################################################################################################### -sub BUILD +sub new { - my $self = shift; + my $class = shift; # Class name + my $strHost = shift; # Host to connect to for remote (optional as this can also be used on the remote) + my $strUser = shift; # User to connect to for remote (must be set if strHost is set) + my $strCommand = shift; # Command to execute on remote + my $iBlockSize = shift; # Optionally, set the block size (defaults to DEFAULT_BLOCK_SIZE) - $self->{strGreeting} .= ' ' . version_get(); + # Create the class hash + my $self = {}; + bless $self, $class; - if (defined($self->{strHost})) + # Create the greeting that will be used to check versions with the remote + $self->{strGreeting} = 'PG_BACKREST_REMOTE ' . version_get(); + + # Set default block size + if (!defined($iBlockSize)) + { + $self->{iBlockSize} = DEFAULT_BLOCK_SIZE; + } + else + { + $self->{iBlockSize} = $iBlockSize; + } + + # If host is defined then make a connnection + if (defined($strHost)) { # User must be defined - if (!defined($self->{strUser})) + if (!defined($strUser)) { confess &log(ASSERT, 'strUser must be defined'); } - # User must be defined - if (!defined($self->{strCommand})) + # Command must be defined + if (!defined($strCommand)) { confess &log(ASSERT, 'strCommand must be defined'); } + $self->{strHost} = $strHost; + $self->{strUser} = $strUser; + $self->{strCommand} = $strCommand; + # Set SSH Options my $strOptionSSHRequestTTY = 'RequestTTY=yes'; my $strOptionSSHCompression = 'Compression=no'; @@ -88,8 +81,8 @@ sub BUILD &log(TRACE, 'connecting to remote ssh host ' . $self->{strHost}); # Make SSH connection - $self->{oSSH} = Net::OpenSSH->new($self->{strHost}, timeout => 300, user => $self->{strUser}, - master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]); + $self->{oSSH} = Net::OpenSSH->new($self->{strHost}, timeout => 600, user => $self->{strUser}, + master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]); $self->{oSSH}->error and confess &log(ERROR, "unable to connect to $self->{strHost}: " . $self->{oSSH}->error); @@ -99,9 +92,7 @@ sub BUILD $self->greeting_read(); } - $self->{oThreadQueue} = Thread::Queue->new(); - $self->{oThreadResult} = Thread::Queue->new(); - $self->{oThread} = threads->create(\&binary_xfer_thread, $self); + return $self; } #################################################################################################################################### @@ -111,11 +102,9 @@ sub thread_kill { my $self = shift; - if (defined($self->{oThread})) + if (defined($self->{oCompressAsync})) { - $self->{oThreadQueue}->enqueue(undef); - $self->{oThread}->join(); - $self->{oThread} = undef; + $self->{oCompressAsync} = undef; } } @@ -127,11 +116,6 @@ sub DEMOLISH my $self = shift; $self->thread_kill(); - - if (defined($self->{oCompressAsync})) - { - $self->{oCompressAsync} = undef; - } } #################################################################################################################################### @@ -140,15 +124,13 @@ sub DEMOLISH sub clone { my $self = shift; - my $iThreadIdx = shift; return BackRest::Remote->new ( - strCommand => $self->{strCommand}, - strHost => $self->{strHost}, - strUser => $self->{strUser}, - iBlockSize => $self->{iBlockSize}, - iThreadIdx => $iThreadIdx + $self->{strHost}, + $self->{strUser}, + $self->{strCommand}, + $self->{iBlockSize} ); } @@ -371,42 +353,6 @@ sub wait_pid } } -#################################################################################################################################### -# BINARY_XFER_THREAD -# -# De/Compresses data on a thread. -#################################################################################################################################### -# sub binary_xfer_thread -# { -# my $self = shift; -# -# while (my $strMessage = $self->{oThreadQueue}->dequeue()) -# { -# my @stryMessage = split(':', $strMessage); -# my @strHandle = split(',', $stryMessage[1]); -# -# my $hIn = IO::Handle->new_from_fd($strHandle[0], '<'); -# my $hOut = IO::Handle->new_from_fd($strHandle[1], '>'); -# -# $self->{oThreadResult}->enqueue('running'); -# -# if ($stryMessage[0] eq 'compress') -# { -# gzip($hIn => $hOut) -# or confess &log(ERROR, 'unable to compress: ' . $GzipError); -# } -# else -# { -# gunzip($hIn => $hOut) -# or die confess &log(ERROR, 'unable to uncompress: ' . $GunzipError); -# } -# -# close($hOut); -# -# $self->{oThreadResult}->enqueue('complete'); -# } -# } - #################################################################################################################################### # BINARY_XFER # @@ -441,10 +387,6 @@ sub binary_xfer my $iBlockTotal = 0; my $strBlockHeader; my $strBlock; - my $oGzip; - my $hPipeIn; - my $hPipeOut; - my $pId; my $bThreadRunning = false; # Both the in and out streams must be defined @@ -456,55 +398,14 @@ sub binary_xfer # If this is output and the source is not already compressed if ($strRemote eq 'out' && !$bSourceCompressed) { - # Increase the blocksize since we are compressing - $iBlockSize *= 4; - - # Open the in/out pipes - pipe $hPipeOut, $hPipeIn; - - # Queue the compression job with the thread - $self->{oThreadQueue}->enqueue('compress:' . fileno($hIn) . ',' . fileno($hPipeIn)); - - # Wait for the thread to acknowledge that it has duplicated the file handles - my $strMessage = $self->{oThreadResult}->dequeue(); - - # Close input pipe so that thread has the only copy, reset hIn to hPipeOut - if ($strMessage eq 'running') - { - close($hPipeIn); - $hIn = $hPipeOut; - } - # If any other message is returned then error - else - { - confess "unknown thread message while waiting for running: ${strMessage}"; - } + $hIn = $self->compress_async_get()->process_begin('compress', $hIn); $bThreadRunning = true; } # Spawn a child process to do decompression elsif ($strRemote eq 'in' && !$bDestinationCompress) { - # Open the in/out pipes - pipe $hPipeOut, $hPipeIn; - - # Queue the decompression job with the thread - $self->{oThreadQueue}->enqueue('decompress:' . fileno($hPipeOut) . ',' . fileno($hOut)); - - # Wait for the thread to acknowledge that it has duplicated the file handles - my $strMessage = $self->{oThreadResult}->dequeue(); - - # Close output pipe so that thread has the only copy, reset hOut to hPipeIn - if ($strMessage eq 'running') - { - close($hPipeOut); - $hOut = $hPipeIn; - } - # If any other message is returned then error - else - { - confess "unknown thread message while waiting for running: ${strMessage}"; - } + $hOut = $self->compress_async_get()->process_begin('decompress', $hOut, 'out'); $bThreadRunning = true; } @@ -591,27 +492,7 @@ sub binary_xfer if ($bThreadRunning) { - # Make sure the de/compress pipes are closed - if ($strRemote eq 'out' && !$bSourceCompressed) - { - close($hPipeOut); - } - elsif ($strRemote eq 'in' && !$bDestinationCompress) - { - close($hPipeIn); - } - - # Wait for the thread to acknowledge that it has completed - my $strMessage = $self->{oThreadResult}->dequeue(); - - if ($strMessage eq 'complete') - { - } - # If any other message is returned then error - else - { - confess "unknown thread message while waiting for complete: ${strMessage}"; - } + $self->compress_async_get()->process_end(); } } @@ -837,5 +718,4 @@ sub command_execute return $self->output_read($bOutputRequired, $strErrorPrefix); } -no Moose; -__PACKAGE__->meta->make_immutable; +return true; diff --git a/test/lib/BackRestTest/BackupTest.pm b/test/lib/BackRestTest/BackupTest.pm index 36d60e593..9c35a18ac 100755 --- a/test/lib/BackRestTest/BackupTest.pm +++ b/test/lib/BackRestTest/BackupTest.pm @@ -259,9 +259,9 @@ sub BackRestTestBackup_Test #------------------------------------------------------------------------------------------------------------------------------- my $oRemote = BackRest::Remote->new ( - strHost => $strHost, - strUser => $strUserBackRest, - strCommand => BackRestTestCommon_CommandRemoteGet() + $strHost, + $strUserBackRest, + BackRestTestCommon_CommandRemoteGet() ); #------------------------------------------------------------------------------------------------------------------------------- diff --git a/test/lib/BackRestTest/CommonTest.pm b/test/lib/BackRestTest/CommonTest.pm index 9700ef00e..b52105847 100755 --- a/test/lib/BackRestTest/CommonTest.pm +++ b/test/lib/BackRestTest/CommonTest.pm @@ -171,15 +171,18 @@ sub BackRestTestCommon_ExecuteEnd # Check the exit status and output an error if needed my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8; - if ($iExitStatus != 0 && !$bSuppressError) + if ($iExitStatus != 0) { - confess &log(ERROR, "command '${strCommand}' returned " . $iExitStatus . "\n" . - ($strOutLog ne '' ? "STDOUT:\n${strOutLog}" : '') . - ($strErrorLog ne '' ? "STDERR:\n${strErrorLog}" : '')); - } - else - { - &log(DEBUG, "suppressed error was ${iExitStatus}"); + if ($bSuppressError) + { + &log(DEBUG, "suppressed error was ${iExitStatus}"); + } + else + { + confess &log(ERROR, "command '${strCommand}' returned " . $iExitStatus . "\n" . + ($strOutLog ne '' ? "STDOUT:\n${strOutLog}" : '') . + ($strErrorLog ne '' ? "STDERR:\n${strErrorLog}" : '')); + } } $hError = undef; diff --git a/test/lib/BackRestTest/FileTest.pm b/test/lib/BackRestTest/FileTest.pm index ab53027d8..acbc521a5 100755 --- a/test/lib/BackRestTest/FileTest.pm +++ b/test/lib/BackRestTest/FileTest.pm @@ -89,11 +89,11 @@ sub BackRestTestFile_Test #------------------------------------------------------------------------------------------------------------------------------- # Create remote #------------------------------------------------------------------------------------------------------------------------------- - my $oRemote = BackRest::Remote->new + my $oRemote = new BackRest::Remote ( - strHost => $strHost, - strUser => $strUser, - strCommand => BackRestTestCommon_CommandRemoteGet() + $strHost, + $strUser, + BackRestTestCommon_CommandRemoteGet() ); #-------------------------------------------------------------------------------------------------------------------------------