1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

Making binary xfer more modular.

This commit is contained in:
David Steele 2015-02-27 13:48:29 -05:00
parent 2506bbb21a
commit bd44b2c471

View File

@ -35,10 +35,10 @@ use constant
####################################################################################################################################
# Remote xfer default block size constant
####################################################################################################################################
#####################################################################################################################################
use constant
{
DEFAULT_BLOCK_SIZE => 8388606
DEFAULT_BLOCK_SIZE => 8388608
};
####################################################################################################################################
@ -347,6 +347,142 @@ sub wait_pid
}
}
####################################################################################################################################
# BLOCK_READ
#
# Read a block from the protocol layer.
####################################################################################################################################
sub block_read
{
my $self = shift;
my $hIn = shift;
my $strBlockRef = shift;
# Read the block header and make sure it's valid
my $strBlockHeader = $self->read_line($hIn);
if ($strBlockHeader !~ /^block [0-9]+$/)
{
$self->wait_pid();
confess "unable to read block header: ${strBlockHeader}";
}
# Get block size from the header
my $iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
# If block size is zero then undef the buffer
if ($iBlockSize == 0)
{
undef($$strBlockRef);
}
# Else read the block
else
{
my $iBlockRead = 0;
my $iBlockIn = 0;
# !!! Would be nice to modify this with a non-blocking read
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
# Read as many chucks as it takes to get the full block
while ($iBlockRead != $iBlockSize)
{
$iBlockIn = sysread($hIn, $$strBlockRef, $iBlockSize - $iBlockRead, $iBlockRead);
if (!defined($iBlockIn))
{
my $strError = $!;
$self->wait_pid();
confess "only read ${iBlockRead}/${iBlockSize} block bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockRead += $iBlockIn;
}
}
# Return the block size
return $iBlockSize;
}
####################################################################################################################################
# BLOCK_WRITE
#
# Write a block to the protocol layer.
####################################################################################################################################
sub block_write
{
my $self = shift;
my $hOut = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
# If block size is not defined, get it from buffer length
$iBlockSize = defined($iBlockSize) ? $iBlockSize : length($$tBlockRef);
# Write block header to the protocol stream
$self->write_line($hOut, "block ${iBlockSize}");
# Write block if size > 0
if ($iBlockSize > 0)
{
$self->stream_write($hOut, $tBlockRef, $iBlockSize);
}
}
####################################################################################################################################
# STREAM_READ
#
# Read data from a stream.
####################################################################################################################################
sub stream_read
{
my $self = shift;
my $hIn = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
# Read a block from the stream
my $iBlockIn = sysread($hIn, $$tBlockRef, $iBlockSize);
if (!defined($iBlockIn))
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
return $iBlockIn;
}
####################################################################################################################################
# STREAM_WRITE
#
# Write data to a stream.
####################################################################################################################################
sub stream_write
{
my $self = shift;
my $hOut = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
# If block size is not defined, get it from buffer length
$iBlockSize = defined($iBlockSize) ? $iBlockSize : length($$tBlockRef);
# Write the block
my $iBlockOut = syswrite($hOut, $$tBlockRef, $iBlockSize);
# Report any errors
if (!defined($iBlockOut) || $iBlockOut != $iBlockSize)
{
my $strError = $!;
$self->wait_pid();
confess "unable to write ${iBlockSize} bytes" . (defined($strError) ? ': ' . $strError : '');
}
}
####################################################################################################################################
# BINARY_XFER
#
@ -436,77 +572,45 @@ sub binary_xfer
$iBlockSize = $self->{iBlockSize};
}
# Read the block header (at start or after the previous block is complete)
if ($iBlockInTotal == $iBlockSize)
# Read block from protocol stream
# &log(INFO, "read new block");
$iBlockSize = $self->block_read($hIn, \$strBlock);
# &log(INFO, "block size = ${iBlockSize}");
if (!$bDestinationCompress)
{
$strBlockHeader = $self->read_line($hIn);
if ($strBlockHeader !~ /^block [0-9]+$/)
{
$self->wait_pid();
confess "unable to read block header ${strBlockHeader}";
}
# Parse the block size
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
$iBlockTotal += 1;
$iBlockInTotal = 0;
&log(INFO, "block size = ${iBlockSize}");
}
# If block size > 0 then read from protocol stream. The entire block may not be read on the first try so keep
# reading until we get it all.
if ($iBlockSize != 0)
{
if ($strRemote eq 'in' && !$bDestinationCompress)
{
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize - $iBlockInTotal);
&log(INFO, "block buffer in = ${iBlockBufferIn}");
if (!defined($iBlockBufferIn))
{
my $strError = $!;
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockInTotal += $iBlockBufferIn;
# Decompress the block
$iBlockIn = $oGzip->read($strBlock);
if ($iBlockIn < 0)
{
confess &log(ERROR, "unable to decompress stream: ${GunzipError}");
}
# $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize - $iBlockInTotal);
#
# &log(INFO, "block buffer in = ${iBlockBufferIn}");
#
# if (!defined($iBlockBufferIn))
# {
# my $strError = $!;
#
# $self->wait_pid();
# confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
# (defined($strError) ? ": ${strError}" : '');
# }
#
# $iBlockInTotal += $iBlockBufferIn;
#
# # Decompress the block
# $iBlockIn = $oGzip->read($strBlock);
#
# if ($iBlockIn < 0)
# {
# confess &log(ERROR, "unable to decompress stream: ${GunzipError}");
# }
# &log(INFO, "block in = ${iBlockIn} - " . length($strBlock));
}
else
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal);
if (!defined($iBlockIn))
{
my $strError = $!;
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockInTotal += $iBlockIn;
}
}
# Else indicate the stream is complete
else
if ($iBlockSize == 0)
{
$iBlockIn = 0;
last;
}
$self->stream_write($hOut, \$strBlock, $iBlockSize);
}
# Read from file input stream
else
@ -514,100 +618,63 @@ sub binary_xfer
# If source is not already compressed then compress it
if ($strRemote eq 'out' && !$bSourceCompressed)
{
# Write the header
# Create the gzip object
if ($bFirst)
{
$bFirst = false;
$oGzip = new IO::Compress::Gzip(\$strBlock, Append => 0)
$oGzip = new IO::Compress::Gzip(\$strBlock, Append => 1)
or confess "IO::Compress::Gzip failed: $GzipError";
if (!defined($strBlock))
{
confess &log(ERROR, "gzip header block does not exist");
}
$iBlockIn = length($strBlock);
}
else
{
# Read a block from the stream
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockBufferIn))
# Read a block from the stream
$iBlockBufferIn = $self->stream_read($hIn, \$strBlockBuffer, $iBlockSize);
# If block size > 0 then compress
if ($iBlockBufferIn > 0)
{
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
}
# If block size > 0 then compress
if ($iBlockBufferIn > 0)
{
# If there was nothing new to compress then close
if (!defined($strBlock))
{
$oGzip->close();
}
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
}
# If there was nothing new to compress then close
if (!defined($strBlock))
{
$oGzip->close();
}
# If there is data in the compressed buffer, then output
if (defined($strBlock))
{
$iBlockIn = length($strBlock);
}
# Else indicate that the stream is complete
else
{
$iBlockIn = 0;
}
# If there is data in the compressed buffer, then output
if (defined($strBlock))
{
$iBlockIn = length($strBlock);
}
# Else indicate that the stream is complete
else
{
# &log(INFO, "set block in 0");
$iBlockIn = 0;
}
}
# If source is already compressed or transfer is not compressed then just read the stream
else
{
$iBlockIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockIn))
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
$iBlockIn = $self->stream_read($hIn, \$strBlock, $iBlockSize);
}
}
# Write block header to the protocol stream
if ($strRemote eq 'out')
{
$self->write_line($hOut, "block ${iBlockIn}");
}
$self->block_write($hOut, \$strBlock, $iBlockIn);
# Write block to output stream
if ($iBlockIn > 0)
{
$iBlockOut = syswrite($hOut, $strBlock, $iBlockIn);
if (!defined($iBlockOut) || $iBlockOut != $iBlockIn)
if ($iBlockIn == 0)
{
$self->wait_pid();
confess "unable to write ${iBlockIn} bytes" . (defined($!) ? ': ' . $! : '');
last;
}
undef($strBlock);
}
# When there is no more data to write then exit
else
{
last;
}
}
}