1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00
pgbackrest/lib/BackRest/Protocol/IO.pm
David Steele f8a2da9400 Work on issue #48: Abandon threads and go to processes
* Major refactoring of the protocol layer to support this work.
* Fixed protocol issue that was preventing ssh errors (especially connect) from being logged.
2015-08-05 08:43:41 -04:00

320 lines
9.3 KiB
Perl

####################################################################################################################################
# PROTOCOL IO MODULE
####################################################################################################################################
package BackRest::Protocol::IO;
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);
use File::Basename qw(dirname);
use IPC::Open3;
use POSIX qw(:sys_wait_h);
use lib dirname($0) . '/../lib';
use BackRest::Exception;
use BackRest::Utility;
####################################################################################################################################
# Operation constants
####################################################################################################################################
use constant OP_IO_PROTOCOL => 'IO';
use constant OP_IO_PROTOCOL_NEW => OP_IO_PROTOCOL . "->new";
use constant OP_IO_PROTOCOL_NEW3 => OP_IO_PROTOCOL . "->new3";
####################################################################################################################################
# CONSTRUCTOR
####################################################################################################################################
sub new
{
my $class = shift; # Class name
my $hIn = shift; # Input stream
my $hOut = shift; # Output stream
my $hErr = shift; # Error stream
my $pId = shift; # Process ID
# Debug
logTrace(OP_IO_PROTOCOL_NEW3, DEBUG_CALL);
# Create the class hash
my $self = {};
bless $self, $class;
$self->{hIn} = $hIn;
$self->{hOut} = $hOut;
$self->{hErr} = $hErr;
$self->{pId} = $pId;
return $self;
}
####################################################################################################################################
# new3
#
# Use open3 to run the command and get the io handles.
####################################################################################################################################
sub new3
{
my $class = shift;
my $strCommand = shift;
# Debug
logTrace(OP_IO_PROTOCOL_NEW3, DEBUG_CALL, undef, {command => \$strCommand});
# Use open3 to run the command
my ($pId, $hIn, $hOut, $hErr);
$pId = IPC::Open3::open3($hIn, $hOut, $hErr, $strCommand);
# Return the IO class
return $class->new($hOut, $hIn, $hErr, $pId);
}
####################################################################################################################################
# Get pid/input/output handles
####################################################################################################################################
sub hInGet
{
my $self = shift;
return $self->{hIn};
}
sub hOutGet
{
my $self = shift;
return $self->{hOut};
}
sub pIdGet
{
my $self = shift;
return $self->{pId};
}
####################################################################################################################################
# kill
####################################################################################################################################
sub kill
{
my $self = shift;
if (defined($self->{pId}))
{
kill 'KILL', $self->{pId};
waitpid($self->{pId}, 0);
undef($self->{pId});
undef($self->{hIn});
undef($self->{hOut});
undef($self->{hErr});
}
}
####################################################################################################################################
# lineRead
#
# Read a line.
####################################################################################################################################
sub lineRead
{
my $self = shift;
my $bError = shift;
my $strLine;
my $strChar;
my $iByteIn;
# my $oOutSelect = IO::Select->new();
# $oOutSelect->add($self->{hOut});
#
# if ($oOutSelect->can_read(2))
# {
# $strLine = readline($self->{hOut});
# $strLine = trim($strLine);
# }
while (1)
{
$iByteIn = sysread($self->{hIn}, $strChar, 1);
if (!defined($iByteIn) || $iByteIn != 1)
{
$self->waitPid();
if (defined($bError) and !$bError)
{
return undef;
}
confess &log(ERROR, 'unable to read 1 byte' . (defined($!) ? ': ' . $! : ''));
}
if ($strChar eq "\n")
{
last;
}
$strLine .= $strChar;
}
return $strLine;
}
####################################################################################################################################
# errorWrite
#
# Write error data to the stream.
####################################################################################################################################
sub errorWrite
{
my $self = shift;
my $strBuffer = shift;
if (defined($self->{pId}))
{
confess &log(ASSERT, 'errorWrite() not valid got master process.');
}
$self->lineWrite($strBuffer, $self->{hError});
}
####################################################################################################################################
# lineWrite
#
# Write line to the stream.
####################################################################################################################################
sub lineWrite
{
my $self = shift;
my $strBuffer = shift;
my $hOut = shift;
my $iLineOut = syswrite(defined($hOut) ? $hOut : $self->{hOut}, (defined($strBuffer) ? $strBuffer : '') . "\n");
if (!defined($iLineOut) || $iLineOut != (defined($strBuffer) ? length($strBuffer) : 0) + 1)
{
confess &log(ERROR, "unable to write ${strBuffer}: $!", ERROR_PROTOCOL);
}
}
####################################################################################################################################
# bufferRead
#
# Read data from a stream.
####################################################################################################################################
sub bufferRead
{
my $self = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
my $iOffset = shift;
# Read a block from the stream
my $iBlockIn = sysread($self->{hIn}, $$tBlockRef, $iBlockSize, defined($iOffset) ? $iOffset : 0);
if (!defined($iBlockIn))
{
$self->waitPid();
confess &log(ERROR, 'unable to read');
}
return $iBlockIn;
}
####################################################################################################################################
# bufferWrite
#
# Write data to a stream.
####################################################################################################################################
sub bufferWrite
{
my $self = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
# If block size is not defined, get it from buffer length
$iBlockSize = defined($iBlockSize) ? $iBlockSize : length($$tBlockRef);
# Write the block
my $iBlockOut = syswrite($self->{hOut}, $$tBlockRef, $iBlockSize);
# Report any errors
if (!defined($iBlockOut) || $iBlockOut != $iBlockSize)
{
my $strError = $!;
$self->waitPid();
confess "unable to write ${iBlockSize} bytes" . (defined($strError) ? ': ' . $strError : '');
}
}
####################################################################################################################################
# waitPid
#
# See if the remote process has terminated unexpectedly.
####################################################################################################################################
sub waitPid
{
my $self = shift;
my $fWaitTime = shift;
my $bReportError = shift;
# Record the start time and set initial sleep interval
my $oWait = waitInit($fWaitTime);
if (defined($self->{pId}))
{
do
{
my $iResult = waitpid($self->{pId}, WNOHANG);
if (defined($fWaitTime))
{
confess &log(TRACE, "waitpid result = $iResult");
}
# If there is no such process
if ($iResult == -1)
{
return true;
}
if ($iResult > 0)
{
if (!defined($bReportError) || $bReportError)
{
my $strError = 'no error on stderr';
if (!defined($self->{hErr}))
{
$strError = 'no error captured because stderr is already closed';
}
else
{
$strError = $self->pipeToString($self->{hErr});
}
$self->{pId} = undef;
$self->{hIn} = undef;
$self->{hOut} = undef;
$self->{hErr} = undef;
confess &log(ERROR, "remote process terminated: ${strError}", ERROR_HOST_CONNECT);
}
return true;
}
&log(TRACE, "waiting for pid");
}
while (waitMore($oWait));
}
return false;
}
1;