1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-14 10:13:05 +02:00

Remote object now using ProcessAsync for de/compression.

Removed dependency on Moose from Remote.
This commit is contained in:
David Steele 2014-10-08 13:54:31 -04:00
parent 16ebbd2279
commit 3d86adadb5
7 changed files with 117 additions and 207 deletions

View File

@ -95,7 +95,7 @@ path=/var/lib/postgresql/9.3/main
#### simple multiple host install
This configuration is appropriate for a small installation where backups are being made remotely. Make sure that postgres@db-host has trusted ssh to backrest@backup-host and vice versa.
This configuration is appropriate for a small installation where backups are being made remotely. Make sure that postgres@db-host has trusted ssh to backrest@backup-host and vice versa. This configuration assumes that you have pg_backrest_remote.pl and pg_backrest.pl in the same path on both servers.
`/etc/pg_backrest.conf on the db host`:
```

View File

@ -244,14 +244,11 @@ sub remote_get()
{
if (!defined($oRemote) && $strRemote ne REMOTE_NONE)
{
$oRemote = BackRest::Remote->new
$oRemote = new BackRest::Remote
(
strHost =>
config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST, true),
strUser =>
config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_USER, true),
strCommand =>
config_key_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_REMOTE, true)
config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST, true),
config_key_load($strRemote eq REMOTE_DB ? CONFIG_SECTION_STANZA : CONFIG_SECTION_BACKUP, CONFIG_KEY_USER, true),
config_key_load(CONFIG_SECTION_COMMAND, CONFIG_KEY_REMOTE, true)
);
}

View File

@ -17,22 +17,21 @@ use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use lib dirname($0) . '/../lib';
use BackRest::Utility;
#use Exporter qw(import);
#our @EXPORT = qw(new DESTROY thread_kill process_begin process_end);
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub new
{
my $class = shift;
# Create the class hash
my $self = {};
bless $self, $class;
$self->{oThreadQueue} = Thread::Queue->new();
$self->{oThreadResult} = Thread::Queue->new();
$self->{oThread} = threads->create(\&process_thread, $self);
bless $self, $class;
return $self;
}
@ -105,8 +104,22 @@ sub process_thread
sub process_begin
{
my $self = shift;
my $strProcess = shift;
my $hIn = shift;
my $strProcess = shift; # process to run (compress, decompress, checksum, size)
my $hHandle = shift; # Handle of the input or output
my $strDirection = shift; # Does hHandle represent in or out?
# Set/check value of strDirection
if (!defined($strDirection))
{
$strDirection = 'in';
}
else
{
if ($strDirection ne 'in' && $strDirection ne 'out')
{
confess &log(ASSERT, 'strDirection must be in (in, out) in ProcessAsync->process_begin()');
}
}
# Check if thread is already running
if (defined($self->{hPipeOut}))
@ -127,18 +140,26 @@ sub process_begin
}
# Validate hIn
if (!defined($hIn))
if (!defined($hHandle))
{
confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()');
confess &log(ASSERT, 'hHandle must be defined in call to ProcessAsync->process_begin()');
}
# Open the in/out pipes
# Open the out/in pipes
my $hPipeOut;
my $hPipeIn;
pipe $self->{hPipeOut}, $hPipeIn;
pipe $hPipeOut, $hPipeIn;
# Queue the queue job with the thread
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hIn) . ',' . fileno($hPipeIn));
# Queue the job in the thread
if ($strDirection eq 'in')
{
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hHandle) . ',' . fileno($hPipeIn));
}
else
{
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hPipeOut) . ',' . fileno($hHandle));
}
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue();
@ -146,7 +167,16 @@ sub process_begin
# Close input pipe so that thread has the only copy
if ($strMessage eq 'running')
{
close($hPipeIn);
if ($strDirection eq 'in')
{
close($hPipeIn);
$self->{hPipe} = $hPipeOut;
}
else
{
close($hPipeOut);
$self->{hPipe} = $hPipeIn;
}
}
# If any other message is returned then error
else
@ -154,7 +184,7 @@ sub process_begin
confess "unknown thread message while waiting for running: ${strMessage}";
}
return $self->{hPipeOut};
return $self->{hPipe};
}
####################################################################################################################################
@ -167,14 +197,14 @@ sub process_end
my $self = shift;
# Check if thread is not running
if (!defined($self->{hPipeOut}))
if (!defined($self->{hPipe}))
{
confess &log(ASSERT, 'thread is not running in ProcessAsync->process_end()');
}
# Make sure the de/compress pipes are closed
close($self->{hPipeOut});
$self->{hPipeOut} = undef;
close($self->{hPipe});
$self->{hPipe} = undef;
# Wait for the thread to acknowledge that it has completed
my $strMessage = $self->{oThreadResult}->dequeue();

View File

@ -8,18 +8,16 @@ use strict;
use warnings;
use Carp;
use Moose;
use Thread::Queue;
use Scalar::Util;
use Net::OpenSSH;
use File::Basename;
use IO::Handle;
use POSIX ':sys_wait_h';
use IO::Compress::Gzip qw(gzip $GzipError);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use Scalar::Util 'blessed';
use lib dirname($0) . '/../lib';
use BackRest::Exception;
use BackRest::Utility;
use BackRest::ProcessAsync;
####################################################################################################################################
# Remote xfer default block size constant
@ -29,58 +27,53 @@ use constant
DEFAULT_BLOCK_SIZE => 1048576
};
####################################################################################################################################
# Module variables
####################################################################################################################################
# Protocol strings
has strGreeting => (is => 'ro', default => 'PG_BACKREST_REMOTE');
# Command strings
has strCommand => (is => 'bare');
# Module variables
has strHost => (is => 'bare'); # Host host
has strUser => (is => 'bare'); # User user
has oSSH => (is => 'bare'); # SSH object
# Process variables
has pId => (is => 'bare'); # Process Id
has hIn => (is => 'bare'); # Input stream
has hOut => (is => 'bare'); # Output stream
has hErr => (is => 'bare'); # Error stream
# Thread variables
has iThreadIdx => (is => 'bare'); # Thread index
has oThread => (is => 'bare'); # Thread object
has oThreadQueue => (is => 'bare'); # Thread queue object
has oThreadResult => (is => 'bare'); # Thread result object
# Block size
has iBlockSize => (is => 'bare', default => DEFAULT_BLOCK_SIZE); # Set block size to default
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub BUILD
sub new
{
my $self = shift;
my $class = shift; # Class name
my $strHost = shift; # Host to connect to for remote (optional as this can also be used on the remote)
my $strUser = shift; # User to connect to for remote (must be set if strHost is set)
my $strCommand = shift; # Command to execute on remote
my $iBlockSize = shift; # Optionally, set the block size (defaults to DEFAULT_BLOCK_SIZE)
$self->{strGreeting} .= ' ' . version_get();
# Create the class hash
my $self = {};
bless $self, $class;
if (defined($self->{strHost}))
# Create the greeting that will be used to check versions with the remote
$self->{strGreeting} = 'PG_BACKREST_REMOTE ' . version_get();
# Set default block size
if (!defined($iBlockSize))
{
$self->{iBlockSize} = DEFAULT_BLOCK_SIZE;
}
else
{
$self->{iBlockSize} = $iBlockSize;
}
# If host is defined then make a connnection
if (defined($strHost))
{
# User must be defined
if (!defined($self->{strUser}))
if (!defined($strUser))
{
confess &log(ASSERT, 'strUser must be defined');
}
# User must be defined
if (!defined($self->{strCommand}))
# Command must be defined
if (!defined($strCommand))
{
confess &log(ASSERT, 'strCommand must be defined');
}
$self->{strHost} = $strHost;
$self->{strUser} = $strUser;
$self->{strCommand} = $strCommand;
# Set SSH Options
my $strOptionSSHRequestTTY = 'RequestTTY=yes';
my $strOptionSSHCompression = 'Compression=no';
@ -88,8 +81,8 @@ sub BUILD
&log(TRACE, 'connecting to remote ssh host ' . $self->{strHost});
# Make SSH connection
$self->{oSSH} = Net::OpenSSH->new($self->{strHost}, timeout => 300, user => $self->{strUser},
master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]);
$self->{oSSH} = Net::OpenSSH->new($self->{strHost}, timeout => 600, user => $self->{strUser},
master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]);
$self->{oSSH}->error and confess &log(ERROR, "unable to connect to $self->{strHost}: " . $self->{oSSH}->error);
@ -99,9 +92,7 @@ sub BUILD
$self->greeting_read();
}
$self->{oThreadQueue} = Thread::Queue->new();
$self->{oThreadResult} = Thread::Queue->new();
$self->{oThread} = threads->create(\&binary_xfer_thread, $self);
return $self;
}
####################################################################################################################################
@ -111,11 +102,9 @@ sub thread_kill
{
my $self = shift;
if (defined($self->{oThread}))
if (defined($self->{oCompressAsync}))
{
$self->{oThreadQueue}->enqueue(undef);
$self->{oThread}->join();
$self->{oThread} = undef;
$self->{oCompressAsync} = undef;
}
}
@ -127,11 +116,6 @@ sub DEMOLISH
my $self = shift;
$self->thread_kill();
if (defined($self->{oCompressAsync}))
{
$self->{oCompressAsync} = undef;
}
}
####################################################################################################################################
@ -140,15 +124,13 @@ sub DEMOLISH
sub clone
{
my $self = shift;
my $iThreadIdx = shift;
return BackRest::Remote->new
(
strCommand => $self->{strCommand},
strHost => $self->{strHost},
strUser => $self->{strUser},
iBlockSize => $self->{iBlockSize},
iThreadIdx => $iThreadIdx
$self->{strHost},
$self->{strUser},
$self->{strCommand},
$self->{iBlockSize}
);
}
@ -371,42 +353,6 @@ sub wait_pid
}
}
####################################################################################################################################
# BINARY_XFER_THREAD
#
# De/Compresses data on a thread.
####################################################################################################################################
# sub binary_xfer_thread
# {
# my $self = shift;
#
# while (my $strMessage = $self->{oThreadQueue}->dequeue())
# {
# my @stryMessage = split(':', $strMessage);
# my @strHandle = split(',', $stryMessage[1]);
#
# my $hIn = IO::Handle->new_from_fd($strHandle[0], '<');
# my $hOut = IO::Handle->new_from_fd($strHandle[1], '>');
#
# $self->{oThreadResult}->enqueue('running');
#
# if ($stryMessage[0] eq 'compress')
# {
# gzip($hIn => $hOut)
# or confess &log(ERROR, 'unable to compress: ' . $GzipError);
# }
# else
# {
# gunzip($hIn => $hOut)
# or die confess &log(ERROR, 'unable to uncompress: ' . $GunzipError);
# }
#
# close($hOut);
#
# $self->{oThreadResult}->enqueue('complete');
# }
# }
####################################################################################################################################
# BINARY_XFER
#
@ -441,10 +387,6 @@ sub binary_xfer
my $iBlockTotal = 0;
my $strBlockHeader;
my $strBlock;
my $oGzip;
my $hPipeIn;
my $hPipeOut;
my $pId;
my $bThreadRunning = false;
# Both the in and out streams must be defined
@ -456,55 +398,14 @@ sub binary_xfer
# If this is output and the source is not already compressed
if ($strRemote eq 'out' && !$bSourceCompressed)
{
# Increase the blocksize since we are compressing
$iBlockSize *= 4;
# Open the in/out pipes
pipe $hPipeOut, $hPipeIn;
# Queue the compression job with the thread
$self->{oThreadQueue}->enqueue('compress:' . fileno($hIn) . ',' . fileno($hPipeIn));
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue();
# Close input pipe so that thread has the only copy, reset hIn to hPipeOut
if ($strMessage eq 'running')
{
close($hPipeIn);
$hIn = $hPipeOut;
}
# If any other message is returned then error
else
{
confess "unknown thread message while waiting for running: ${strMessage}";
}
$hIn = $self->compress_async_get()->process_begin('compress', $hIn);
$bThreadRunning = true;
}
# Spawn a child process to do decompression
elsif ($strRemote eq 'in' && !$bDestinationCompress)
{
# Open the in/out pipes
pipe $hPipeOut, $hPipeIn;
# Queue the decompression job with the thread
$self->{oThreadQueue}->enqueue('decompress:' . fileno($hPipeOut) . ',' . fileno($hOut));
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue();
# Close output pipe so that thread has the only copy, reset hOut to hPipeIn
if ($strMessage eq 'running')
{
close($hPipeOut);
$hOut = $hPipeIn;
}
# If any other message is returned then error
else
{
confess "unknown thread message while waiting for running: ${strMessage}";
}
$hOut = $self->compress_async_get()->process_begin('decompress', $hOut, 'out');
$bThreadRunning = true;
}
@ -591,27 +492,7 @@ sub binary_xfer
if ($bThreadRunning)
{
# Make sure the de/compress pipes are closed
if ($strRemote eq 'out' && !$bSourceCompressed)
{
close($hPipeOut);
}
elsif ($strRemote eq 'in' && !$bDestinationCompress)
{
close($hPipeIn);
}
# Wait for the thread to acknowledge that it has completed
my $strMessage = $self->{oThreadResult}->dequeue();
if ($strMessage eq 'complete')
{
}
# If any other message is returned then error
else
{
confess "unknown thread message while waiting for complete: ${strMessage}";
}
$self->compress_async_get()->process_end();
}
}
@ -837,5 +718,4 @@ sub command_execute
return $self->output_read($bOutputRequired, $strErrorPrefix);
}
no Moose;
__PACKAGE__->meta->make_immutable;
return true;

View File

@ -259,9 +259,9 @@ sub BackRestTestBackup_Test
#-------------------------------------------------------------------------------------------------------------------------------
my $oRemote = BackRest::Remote->new
(
strHost => $strHost,
strUser => $strUserBackRest,
strCommand => BackRestTestCommon_CommandRemoteGet()
$strHost,
$strUserBackRest,
BackRestTestCommon_CommandRemoteGet()
);
#-------------------------------------------------------------------------------------------------------------------------------

View File

@ -171,15 +171,18 @@ sub BackRestTestCommon_ExecuteEnd
# Check the exit status and output an error if needed
my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8;
if ($iExitStatus != 0 && !$bSuppressError)
if ($iExitStatus != 0)
{
confess &log(ERROR, "command '${strCommand}' returned " . $iExitStatus . "\n" .
($strOutLog ne '' ? "STDOUT:\n${strOutLog}" : '') .
($strErrorLog ne '' ? "STDERR:\n${strErrorLog}" : ''));
}
else
{
&log(DEBUG, "suppressed error was ${iExitStatus}");
if ($bSuppressError)
{
&log(DEBUG, "suppressed error was ${iExitStatus}");
}
else
{
confess &log(ERROR, "command '${strCommand}' returned " . $iExitStatus . "\n" .
($strOutLog ne '' ? "STDOUT:\n${strOutLog}" : '') .
($strErrorLog ne '' ? "STDERR:\n${strErrorLog}" : ''));
}
}
$hError = undef;

View File

@ -89,11 +89,11 @@ sub BackRestTestFile_Test
#-------------------------------------------------------------------------------------------------------------------------------
# Create remote
#-------------------------------------------------------------------------------------------------------------------------------
my $oRemote = BackRest::Remote->new
my $oRemote = new BackRest::Remote
(
strHost => $strHost,
strUser => $strUser,
strCommand => BackRestTestCommon_CommandRemoteGet()
$strHost,
$strUser,
BackRestTestCommon_CommandRemoteGet()
);
#-------------------------------------------------------------------------------------------------------------------------------