1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-14 10:13:05 +02:00
pgbackrest/lib/BackRest/Remote.pm

785 lines
22 KiB
Perl
Raw Normal View History

2014-06-07 04:16:24 +03:00
####################################################################################################################################
# REMOTE MODULE
####################################################################################################################################
package BackRest::Remote;
2014-06-07 04:16:24 +03:00
use threads;
use strict;
use warnings;
use Carp;
use Moose;
use Net::OpenSSH;
use File::Basename;
use POSIX ":sys_wait_h";
use IO::Compress::Gzip qw(gzip $GzipError);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
2014-06-07 18:51:27 +03:00
use lib dirname($0) . "/../lib";
use BackRest::Exception;
2014-06-07 23:13:41 +03:00
use BackRest::Utility;
2014-06-07 04:16:24 +03:00
####################################################################################################################################
# Remote xfer default block size constant
####################################################################################################################################
use constant
{
DEFAULT_BLOCK_SIZE => 4
};
####################################################################################################################################
# Module variables
####################################################################################################################################
2014-06-07 04:16:24 +03:00
# Protocol strings
2014-06-07 23:06:46 +03:00
has strGreeting => (is => 'ro', default => 'PG_BACKREST_REMOTE');
2014-06-07 04:16:24 +03:00
# Command strings
has strCommand => (is => 'bare');
# Module variables
has strHost => (is => 'bare'); # Host host
has strUser => (is => 'bare'); # User user
has oSSH => (is => 'bare'); # SSH object
# Process variables
has pId => (is => 'bare'); # SSH object
has hIn => (is => 'bare'); # SSH object
has hOut => (is => 'bare'); # SSH object
has hErr => (is => 'bare'); # SSH object
# Block size
has iBlockSize => (is => 'bare', default => DEFAULT_BLOCK_SIZE); # Set block size to default
2014-06-13 04:56:20 +03:00
2014-06-07 04:16:24 +03:00
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub BUILD
{
my $self = shift;
2014-06-07 23:06:46 +03:00
$self->{strGreeting} .= " " . version_get();
2014-06-07 04:16:24 +03:00
if (defined($self->{strHost}))
{
# User must be defined
if (!defined($self->{strUser}))
{
confess &log(ASSERT, "strUser must be defined");
}
# User must be defined
if (!defined($self->{strCommand}))
{
confess &log(ASSERT, "strCommand must be defined");
}
# Set SSH Options
my $strOptionSSHRequestTTY = "RequestTTY=yes";
my $strOptionSSHCompression = "Compression=no";
# if ($self->{bNoCompression})
# {
# $strOptionSSHCompression = "Compression=yes";
# }
2014-06-07 22:30:13 +03:00
&log(TRACE, "connecting to remote ssh host " . $self->{strHost});
2014-06-07 04:16:24 +03:00
# Make SSH connection
$self->{oSSH} = Net::OpenSSH->new($self->{strHost}, timeout => 300, user => $self->{strUser},
master_opts => [-o => $strOptionSSHCompression, -o => $strOptionSSHRequestTTY]);
$self->{oSSH}->error and confess &log(ERROR, "unable to connect to $self->{strHost}: " . $self->{oSSH}->error);
# Execute remote command
($self->{hIn}, $self->{hOut}, $self->{hErr}, $self->{pId}) = $self->{oSSH}->open3($self->{strCommand});
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
$self->greeting_read();
2014-06-07 04:16:24 +03:00
}
else
{
}
2014-06-07 04:16:24 +03:00
}
####################################################################################################################################
# CLONE
####################################################################################################################################
sub clone
{
my $self = shift;
return pg_backrest_remote->new
(
strCommand => $self->{strCommand},
strHost => $self->{strUser},
strUser => $self->{strHost}
);
}
####################################################################################################################################
# GREETING_READ
####################################################################################################################################
sub greeting_read
{
my $self = shift;
# Make sure that the remote is running the right version
if ($self->read_line($self->{hOut}) ne $self->{strGreeting})
2014-06-07 04:16:24 +03:00
{
confess &log(ERROR, "remote version mismatch");
}
}
####################################################################################################################################
# GREETING_WRITE
####################################################################################################################################
sub greeting_write
{
my $self = shift;
if (!syswrite(*STDOUT, "$self->{strGreeting}\n"))
{
confess "unable to write greeting";
}
}
2014-06-07 18:51:27 +03:00
####################################################################################################################################
# STRING_WRITE
####################################################################################################################################
sub string_write
{
my $self = shift;
my $hOut = shift;
my $strBuffer = shift;
2014-06-07 22:30:13 +03:00
2014-06-07 18:51:27 +03:00
$strBuffer =~ s/\n/\n\./g;
2014-06-07 22:30:13 +03:00
2014-06-07 18:51:27 +03:00
if (!syswrite($hOut, "." . $strBuffer))
{
confess "unable to write string";
}
}
####################################################################################################################################
# PIPE_TO_STRING Function
#
# Copies data from a file handle into a string.
####################################################################################################################################
sub pipe_to_string
{
my $self = shift;
my $hOut = shift;
my $strBuffer;
my $hString = IO::String->new($strBuffer);
$self->binary_xfer($hOut, $hString);
return $strBuffer;
}
2014-06-07 18:51:27 +03:00
####################################################################################################################################
# ERROR_WRITE
####################################################################################################################################
sub error_write
{
my $self = shift;
my $oMessage = shift;
2014-06-07 22:30:13 +03:00
2014-06-07 18:51:27 +03:00
my $iCode;
my $strMessage;
2014-06-07 22:30:13 +03:00
2014-06-07 18:51:27 +03:00
if (blessed($oMessage))
{
2014-06-07 22:30:13 +03:00
if ($oMessage->isa("BackRest::Exception"))
2014-06-07 18:51:27 +03:00
{
$iCode = $oMessage->code();
$strMessage = $oMessage->message();
}
else
{
syswrite(*STDERR, 'unknown error object: ' . $oMessage);
exit 1;
2014-06-07 18:51:27 +03:00
}
}
else
{
syswrite(*STDERR, $oMessage);
exit 1;
2014-06-07 18:51:27 +03:00
}
if (defined($strMessage))
{
$self->string_write(*STDOUT, trim($strMessage));
}
if (!syswrite(*STDOUT, "\nERROR" . (defined($iCode) ? " $iCode" : "") . "\n"))
{
confess "unable to write error";
}
}
####################################################################################################################################
# READ_LINE
####################################################################################################################################
sub read_line
{
my $self = shift;
my $hIn = shift;
my $bError = shift;
my $strLine;
my $strChar;
my $iByteIn;
while (1)
{
$iByteIn = sysread($hIn, $strChar, 1);
if (!defined($iByteIn) || $iByteIn != 1)
{
$self->wait_pid();
if (defined($bError) and !$bError)
{
return undef;
}
confess &log(ERROR, "unable to read 1 byte" . (defined($!) ? ": " . $! : ""));
}
if ($strChar eq "\n")
{
last;
}
$strLine .= $strChar;
}
return $strLine;
}
####################################################################################################################################
# WAIT_PID
####################################################################################################################################
sub wait_pid
{
my $self = shift;
if (defined($self->{pId}) && waitpid($self->{pId}, WNOHANG) != 0)
{
my $strError = "no error on stderr";
if (!defined($self->{hErr}))
{
$strError = "no error captured because stderr is already closed";
}
else
{
$strError = $self->pipe_to_string($self->{hErr});
}
$self->{pId} = undef;
$self->{hIn} = undef;
$self->{hOut} = undef;
$self->{hErr} = undef;
confess &log(ERROR, "remote process terminated: ${strError}");
}
}
2014-06-13 04:56:20 +03:00
####################################################################################################################################
# BINARY_XFER
#
# Copies data from one file handle to another, optionally compressing or decompressing the data in stream.
####################################################################################################################################
sub binary_xfer
{
my $self = shift;
my $hIn = shift;
my $hOut = shift;
my $strRemote = shift;
my $bCompress = shift;
$strRemote = defined($strRemote) ? $strRemote : 'none';
my $iBlockSize = $self->{iBlockSize};
2014-06-13 04:56:20 +03:00
my $iBlockIn;
my $iBlockOut;
my $strBlockHeader;
2014-06-13 04:56:20 +03:00
my $strBlock;
my $oGzip;
my $hPipeIn;
my $hPipeOut;
my $pId;
if (!defined($hIn) || !defined($hOut))
{
confess &log(ASSERT, "hIn or hOut is not defined");
}
2014-06-13 04:56:20 +03:00
if ($strRemote eq "out")
{
pipe $hPipeOut, $hPipeIn;
# fork and exit the parent process
$pId = fork();
if (!$pId)
{
close($hPipeOut);
gzip($hIn => $hPipeIn)
or die confess &log(ERROR, "unable to compress: " . $GzipError);
close($hPipeIn);
exit 0;
}
close($hPipeIn);
$hIn = $hPipeOut;
}
elsif ($strRemote eq "in" && defined($bCompress) && !$bCompress)
{
pipe $hPipeOut, $hPipeIn;
# fork and exit the parent process
$pId = fork();
if (!$pId)
{
close($hPipeIn);
gunzip($hPipeOut => $hOut)
or die confess &log(ERROR, "unable to uncompress: " . $GunzipError);
close($hPipeOut);
exit 0;
}
# exit 0;
close($hPipeOut);
$hOut = $hPipeIn;
}
while (1)
2014-06-13 04:56:20 +03:00
{
if ($strRemote eq 'in')
{
$strBlockHeader = $self->read_line($hIn);
2014-06-13 04:56:20 +03:00
if ($strBlockHeader !~ /^block [0-9]+$/)
{
confess "unable to read block header ${strBlockHeader}";
}
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, " ") + 1));
# confess "found $iBlockSize to write";
if ($iBlockSize != 0)
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize);
2014-06-13 04:56:20 +03:00
if (!defined($iBlockIn) || $iBlockIn != $iBlockSize)
{
confess "unable to read ${iBlockSize} bytes from remote" . (defined($!) ? ": " . $! : "");
}
}
else
2014-06-13 04:56:20 +03:00
{
$iBlockIn = 0;
2014-06-13 04:56:20 +03:00
}
}
else
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize);
if (!defined($iBlockIn))
2014-06-13 04:56:20 +03:00
{
confess &log(ERROR, "unable to read");
2014-06-13 04:56:20 +03:00
}
}
# if (defined($bCompress))
# {
# if ($iBlockIn > 0)
# {
# my $iBlockInTmp = $iBlockIn;
#
# # print "${iBlockIn} bytes to compress";
#
# if ($bCompress)
# {
# $iBlockIn = $oGzip->syswrite($hIn, $iBlockInTmp);
#
# if (!defined($iBlockIn))
# {
# confess &log(ERROR, "unable to compress stream: " . $GunzipError)
# }
#
# if ($iBlockIn != $iBlockInTmp)
# {
# confess &log(ERROR, "unable to read ${iBlockSize} bytes");
# }
# }
# else
# {
# $iBlockIn = $oGzip->sysread($strBlock, 8192);
#
# if (!defined($iBlockIn))
# {
# confess &log(ERROR, "unable to read compressed stream: " . $GunzipError)
# }
# # $strBlock = $strBlockTmp;
# }
# }
# else
# {
# if ($bCompress)
# {
# $oGzip->flush()
# or confess &log(ERROR, "unable to flush compressed stream");
# $bDone = true;
# }
# }
#
# $iBlockIn = length($strBlock);
# }
if ($strRemote eq 'out')
2014-06-13 04:56:20 +03:00
{
$strBlockHeader = "block ${iBlockIn}\n";
# print "wrote block header: ${strBlockHeader}"; # REMOVE!
$iBlockOut = syswrite($hOut, $strBlockHeader);
if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader))
2014-06-13 04:56:20 +03:00
{
confess "unable to write block header";
2014-06-13 04:56:20 +03:00
}
}
2014-06-13 04:56:20 +03:00
if ($iBlockIn > 0)
{
2014-06-13 04:56:20 +03:00
# Write to the output handle
# print "writing: ${strBlock}\n"; # REMOVE!
$iBlockOut = syswrite($hOut, $strBlock, $iBlockIn);
2014-06-13 04:56:20 +03:00
if (!defined($iBlockOut) || $iBlockOut != $iBlockIn)
{
confess "unable to write ${iBlockIn} bytes" . (defined($!) ? ": " . $! : "");
}
# $strBlock = undef;
# if ($bDone && $strRemote eq 'out')
# {
# $strBlockHeader = "block 0\n";
#
# # print "wrote block header: ${strBlockHeader}"; # REMOVE!
#
# $iBlockOut = syswrite($hOut, $strBlockHeader);
#
# if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader))
# {
# confess "unable to write block header";
# }
# }
2014-06-13 04:56:20 +03:00
}
else
{
last;
# if ($strRemote eq 'in')
# {
# confess "got out\n";
# }
#
# if (defined($hPipeIn))
# {
# close($hPipeIn)
# };
#
# $bDone = true;
2014-06-13 04:56:20 +03:00
}
}
if (defined($pId))
{
if ($strRemote eq "out")
{
close($hPipeOut);
}
elsif ($strRemote eq "in" && defined($bCompress) && !$bCompress)
{
# confess "waiting for child";
close($hPipeIn);
}
waitpid($pId, 0);
my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8;
if ($iExitStatus != 0)
{
confess &log(ERROR, "compression/decompression child process returned " . $iExitStatus);
}
}
# print "got out\n";
2014-06-13 04:56:20 +03:00
}
2014-06-07 18:51:27 +03:00
####################################################################################################################################
# OUTPUT_READ
####################################################################################################################################
sub output_read
{
my $self = shift;
my $bOutputRequired = shift;
my $strErrorPrefix = shift;
2014-06-07 18:51:27 +03:00
2014-06-07 22:01:29 +03:00
# &log(TRACE, "Remote->output_read");
2014-06-07 20:15:55 +03:00
2014-06-07 18:51:27 +03:00
my $strLine;
my $strOutput;
my $bError = false;
my $iErrorCode;
my $strError;
2014-06-07 18:51:27 +03:00
# print "error read wait\n";
#
# if (!eof($self->{hErr}))
# {
# $strError = $self->pipe_to_string($self->{hErr});
#
# if (defined($strError))
# {
# $bError = true;
# $strOutput = $strError;
# $iErrorCode = undef;
# }
# }
while ($strLine = $self->read_line($self->{hOut}, false))
{
2014-06-07 18:51:27 +03:00
if ($strLine =~ /^ERROR.*/)
{
$bError = true;
2014-06-07 22:30:13 +03:00
$iErrorCode = (split(' ', $strLine))[1];
2014-06-07 22:30:13 +03:00
2014-06-07 18:51:27 +03:00
last;
}
if ($strLine =~ /^OK$/)
2014-06-07 18:51:27 +03:00
{
last;
}
$strOutput .= (defined($strOutput) ? "\n" : "") . substr($strLine, 1);
}
# Capture any errors
if ($bError)
{
confess &log(ERROR, (defined($strErrorPrefix) ? "${strErrorPrefix}" : "") .
(defined($strOutput) ? ": ${strOutput}" : ""), $iErrorCode);
}
$self->wait_pid();
if ($bOutputRequired && !defined($strOutput))
{
my $strError = $self->pipe_to_string($self->{hErr});
if (defined($strError))
{
$bError = true;
$strOutput = $strError;
}
# Capture any errors
if ($bError)
{
confess &log(ERROR, (defined($strErrorPrefix) ? "${strErrorPrefix}" : "") .
(defined($strOutput) ? ": ${strOutput}" : ""));
}
confess &log(ERROR, (defined($strErrorPrefix) ? "${strErrorPrefix}: " : "") . "output is not defined");
2014-06-07 18:51:27 +03:00
}
return $strOutput;
2014-06-07 18:51:27 +03:00
}
####################################################################################################################################
# OUTPUT_WRITE
####################################################################################################################################
sub output_write
{
my $self = shift;
my $strOutput = shift;
2014-06-07 22:30:13 +03:00
if (defined($strOutput))
{
$self->string_write(*STDOUT, "${strOutput}");
2014-06-07 18:51:27 +03:00
if (!syswrite(*STDOUT, "\n"))
{
confess "unable to write output";
}
}
if (!syswrite(*STDOUT, "OK\n"))
2014-06-07 18:51:27 +03:00
{
confess "unable to write output";
}
}
####################################################################################################################################
2014-06-07 22:01:29 +03:00
# COMMAND_PARAM_STRING
2014-06-07 18:51:27 +03:00
####################################################################################################################################
2014-06-07 22:01:29 +03:00
sub command_param_string
2014-06-07 18:51:27 +03:00
{
my $self = shift;
2014-06-07 22:01:29 +03:00
my $oParamHashRef = shift;
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
my $strParamList;
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
foreach my $strParam (sort(keys $oParamHashRef))
2014-06-07 18:51:27 +03:00
{
2014-06-07 22:01:29 +03:00
$strParamList .= (defined($strParamList) ? "," : "") . "${strParam}=" .
(defined(${$oParamHashRef}{"${strParam}"}) ? ${$oParamHashRef}{"${strParam}"} : "[undef]");
2014-06-07 18:51:27 +03:00
}
2014-06-07 22:01:29 +03:00
return $strParamList;
}
####################################################################################################################################
# COMMAND_READ
####################################################################################################################################
sub command_read
{
my $self = shift;
my $oParamHashRef = shift;
# &log(TRACE, "Remote->command_read");
my $strLine;
my $strCommand;
while ($strLine = $self->read_line(*STDIN))
2014-06-07 18:51:27 +03:00
{
# $strLine = trim($strLine);
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
if (!defined($strCommand))
{
if ($strLine =~ /:$/)
{
$strCommand = substr($strLine, 0, length($strLine) - 1);
# print "command ${strCommand} continues\n";
}
else
{
$strCommand = $strLine;
# print "command ${strCommand}\n";
last;
}
}
else
{
if ($strLine eq 'end')
{
last;
}
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
my $iPos = index($strLine, "=");
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
if ($iPos == -1)
{
confess "param \"${strLine}\" is missing = character";
}
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
my $strParam = substr($strLine, 0, $iPos);
my $strValue = substr($strLine, $iPos + 1);
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
${$oParamHashRef}{"${strParam}"} = ${strValue};
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
# print "${strParam}=${strValue}\n";
}
2014-06-07 18:51:27 +03:00
}
2014-06-07 22:01:29 +03:00
return $strCommand;
2014-06-07 18:51:27 +03:00
}
####################################################################################################################################
# COMMAND_WRITE
####################################################################################################################################
sub command_write
{
my $self = shift;
my $strCommand = shift;
2014-06-07 22:01:29 +03:00
my $oParamRef = shift;
my $strOutput = $strCommand;
if (defined($oParamRef))
{
$strOutput = "${strCommand}:\n";
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
foreach my $strParam (sort(keys $oParamRef))
{
if ($strParam =~ /=/)
{
confess &log(ASSERT, "param \"${strParam}\" cannot contain = character");
}
my $strValue = ${$oParamRef}{"${strParam}"};
if ($strParam =~ /\n\$/)
{
confess &log(ASSERT, "param \"${strParam}\" value cannot end with LF");
}
2014-06-07 22:30:13 +03:00
2014-06-07 22:01:29 +03:00
if (defined(${strValue}))
{
$strOutput .= "${strParam}=${strValue}\n";
}
}
$strOutput .= "end";
}
2014-06-07 18:51:27 +03:00
&log(TRACE, "Remote->command_write:\n" . $strOutput);
2014-06-07 20:15:55 +03:00
2014-06-07 22:01:29 +03:00
if (!syswrite($self->{hIn}, "${strOutput}\n"))
2014-06-07 18:51:27 +03:00
{
confess "unable to write command";
}
}
2014-06-07 20:15:55 +03:00
####################################################################################################################################
# COMMAND_EXECUTE
####################################################################################################################################
sub command_execute
{
my $self = shift;
my $strCommand = shift;
2014-06-13 04:56:20 +03:00
my $oParamRef = shift;
my $bOutputRequired = shift;
2014-06-07 20:15:55 +03:00
my $strErrorPrefix = shift;
2014-06-13 04:56:20 +03:00
$self->command_write($strCommand, $oParamRef);
2014-06-07 22:30:13 +03:00
return $self->output_read($bOutputRequired, $strErrorPrefix);
2014-06-07 20:15:55 +03:00
}
2014-06-07 04:16:24 +03:00
no Moose;
__PACKAGE__->meta->make_immutable;