1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-09-16 09:06:18 +02:00

Full refactor of binary_xfer.

Compression now uses Compress::Raw::Zlib.
This commit is contained in:
David Steele
2015-03-01 15:17:57 -05:00
parent 77bc4238dc
commit 393525d280

View File

@@ -13,8 +13,6 @@ use Net::OpenSSH;
use File::Basename;
use POSIX ':sys_wait_h';
use Scalar::Util 'blessed';
use IO::Compress::Gzip qw($GzipError);
use IO::Uncompress::Gunzip qw($GunzipError);
use Compress::Raw::Zlib;
use lib dirname($0) . '/../lib';
@@ -505,7 +503,7 @@ sub stream_write
# BINARY_XFER
#
# Copies data from one file handle to another, optionally compressing or decompressing the data in stream. If $strRemote != none
# then one side is a protocol stream.
# then one side is a protocol stream, though this can be controlled with the bProtocol param.
####################################################################################################################################
sub binary_xfer
{
@@ -538,157 +536,100 @@ sub binary_xfer
# Default protocol to true
$bProtocol = defined($bProtocol) ? $bProtocol : true;
# Working variables
my $iBlockSize = $self->{iBlockSize};
my $iBlockIn;
my $iBlockBufferIn;
my $strBlock;
my $strBlockBuffer;
# Checksum and size
my $oSHA = undef;
my $iFileSize = undef;
my $oGzip = undef;
my $oZLib = undef;
my $iZLibStatus;
my $oSHA = undef;
my $bFirst = true;
while (1)
{
# Read from the protocol stream
if ($strRemote eq 'in')
{
# If the destination should not be compressed then decompress
if (!$bDestinationCompress)
{
# Read a block from the protocol stream
$iBlockSize = $self->block_read($hIn, \$strBlockBuffer, $bProtocol);
my $iBlockSize;
my $tCompressedBuffer;
my $tUncompressedBuffer;
my $iUncompressedBufferSize;
# If block size = -1 it means an error happened on the remote and we need to exit so it can be returned.
if ($iBlockSize == -1)
{
last;
}
# If this is the first block then initialize Gunzip
if ($bFirst)
{
# Initialize checksum and filesize
$oSHA = Digest::SHA->new('sha1');
$iFileSize = 0;
# if ($iBlockSize == 0)
# {
# &log(ASSERT, 'first protocol block is zero');
# }
#
# # Gunzip doesn't like to be initialized with just the header, so if the first block is 10 bytes then fetch
# # another another block to make sure so is at least some payload.
# if ($iBlockSize <= 10)
# {
# $iBlockSize = $self->block_read($hIn, \$strBlockBuffer, $bProtocol);
# }
# Initialize Gunzip
($oZLib, $iZLibStatus) = new Compress::Raw::Zlib::Inflate(LimitOutput => 1, -WindowBits => WANT_GZIP,
-Bufsize => $self->{iBlockSize});
# Initialize inflate object and check for errors
my ($oZLib, $iZLibStatus) =
new Compress::Raw::Zlib::Inflate(WindowBits => WANT_GZIP, Bufsize => $self->{iBlockSize}, LimitOutput => 1);
if ($iZLibStatus != Z_OK)
{
confess &log(ERROR, "unable create a inflation stream: ${iZLibStatus}");
confess &log(ERROR, "unable create a inflate object: ${iZLibStatus}");
}
# $oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Append => 1, Transparent => 0,
# BlockSize => $self->{iBlockSize})
# or confess "IO::Uncompress::Gunzip failed (${iBlockSize}): $GunzipError";
# Clear first block flag
$bFirst = false;
}
# Read all input
do
{
# Read a block from the input stream
$iBlockSize = $self->block_read($hIn, \$tCompressedBuffer, $bProtocol);
# If the block contains data, decompress it
if ($iBlockSize > 0)
{
# my $iUncompressedTotal = 0;
# # Loop while there is more data to uncompress
# while (!$oGzip->eof())
# {
# # Decompress the block
# $iBlockIn = $oGzip->read($strBlock);
#
# if ($iBlockIn < 0)
# {
# confess &log(ERROR, "unable to decompress stream ($iBlockIn): ${GunzipError}");
# }
#
# $iUncompressedTotal += $iBlockIn;
# }
# Keep looping while there is more to decompress
do
{
$iZLibStatus = $oZLib->inflate($strBlockBuffer, $strBlock);
# Decompress data
$iZLibStatus = $oZLib->inflate($tCompressedBuffer, $tUncompressedBuffer);
$iUncompressedBufferSize = length($tUncompressedBuffer);
# If status is ok, write the data
if ($iZLibStatus == Z_OK || $iZLibStatus == Z_BUF_ERROR || $iZLibStatus == Z_STREAM_END)
{
$iFileSize += length($strBlock);
$oSHA->add($strBlock);
# Add data to filesize and checksum
$iFileSize += $iUncompressedBufferSize;
$oSHA->add($tUncompressedBuffer);
# Write data if hOut is defined
if (defined($hOut))
{
$self->stream_write($hOut, \$strBlock);
$self->stream_write($hOut, \$tUncompressedBuffer, $iUncompressedBufferSize);
}
}
# Else error, exit so it can be handled
else
{
$iBlockSize = 0;
last;
}
}
while ($iZLibStatus == Z_OK && length($strBlock));
# # Write out the uncompressed bytes if there are any
# if ($iUncompressedTotal > 0)
# {
# $oSHA->add($strBlock);
# $iFileSize += $iUncompressedTotal;
#
# if (defined($hOut))
# {
# $self->stream_write($hOut, \$strBlock, $iUncompressedTotal);
# }
#
# undef($strBlock);
# }
while ($iZLibStatus == Z_OK && $iUncompressedBufferSize > 0);
}
# Make sure the decompression succeeded
if ($iBlockSize == 0)
{
if ($iZLibStatus != Z_STREAM_END)
{
confess &log(ERROR, "unable to inflate stream: gzip returned ${iZLibStatus}");
}
while ($iBlockSize > 0);
last;
# Make sure the decompression succeeded (iBlockSize < 0 indicates remote error, handled later)
if ($iBlockSize == 0 && $iZLibStatus != Z_STREAM_END)
{
confess &log(ERROR, "unable to inflate stream: ${iZLibStatus}");
}
}
# If the destination should be compressed then just write out the already compressed stream
else
{
my $iBlockSize;
my $tBuffer;
do
{
# Read a block from the protocol stream
$iBlockSize = $self->block_read($hIn, \$strBlock, $bProtocol);
$iBlockSize = $self->block_read($hIn, \$tBuffer, $bProtocol);
# If the block contains data, write it
if ($iBlockSize > 0)
{
$self->stream_write($hOut, \$strBlock, $iBlockSize);
undef($strBlock);
$self->stream_write($hOut, \$tBuffer, $iBlockSize);
undef($tBuffer);
}
# Else done
else
{
last;
}
while ($iBlockSize > 0);
}
}
# Read from file input stream
@@ -697,70 +638,108 @@ sub binary_xfer
# If source is not already compressed then compress it
if ($strRemote eq 'out' && !$bSourceCompressed)
{
# Create the gzip object
if ($bFirst)
{
my $iBlockSize;
my $tCompressedBuffer;
my $iCompressedBufferSize;
my $tUncompressedBuffer;
# Initialize checksum
$oSHA = Digest::SHA->new('sha1');
$iFileSize = 0;
$oGzip = new IO::Compress::Gzip(\$strBlock, Append => 1)
or confess "IO::Compress::Gzip failed: $GzipError";
# Initialize inflate object and check for errors
my ($oZLib, $iZLibStatus) =
new Compress::Raw::Zlib::Deflate(WindowBits => WANT_GZIP, Bufsize => $self->{iBlockSize}, AppendOutput => 1);
# Clear first block flag
$bFirst = false;
if ($iZLibStatus != Z_OK)
{
confess &log(ERROR, "unable create a deflate object: ${iZLibStatus}");
}
do
{
# Read a block from the stream
$iBlockBufferIn = $self->stream_read($hIn, \$strBlockBuffer, $iBlockSize);
$oSHA->add($strBlockBuffer);
$iFileSize += $iBlockBufferIn;
$iBlockSize = $self->stream_read($hIn, \$tUncompressedBuffer, $self->{iBlockSize});
# If block size > 0 then compress
if ($iBlockBufferIn > 0)
if ($iBlockSize > 0)
{
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
# Update checksum and filesize
$oSHA->add($tUncompressedBuffer);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
{
$self->wait_pid();
confess &log(ERROR, "IO::Compress::Gzip failed: $GzipError");
}
# Compress the data
$iZLibStatus = $oZLib->deflate($tUncompressedBuffer, $tCompressedBuffer);
$iCompressedBufferSize = length($tCompressedBuffer);
if (defined($hOut) && defined($strBlock) && length($strBlock) > $self->{iBlockSize})
# If compression was successful
if ($iZLibStatus == Z_OK)
{
$self->block_write($hOut, \$strBlock, undef, $bProtocol);
undef($strBlock);
# The compressed data is larger than block size, then write
if ($iCompressedBufferSize > $self->{iBlockSize})
{
$self->block_write($hOut, \$tCompressedBuffer, $iCompressedBufferSize, $bProtocol);
undef($tCompressedBuffer);
}
}
# If there was nothing new to compress then close
# Else if error
else
{
$oGzip->close();
$iBlockSize = 0;
last;
}
}
}
while ($iBlockSize > 0);
# If good so far flush out the last bytes
if ($iZLibStatus == Z_OK)
{
$iZLibStatus = $oZLib->flush($tCompressedBuffer);
}
# Make sure the compression succeeded
if ($iZLibStatus != Z_OK)
{
confess &log(ERROR, "unable to deflate stream: ${iZLibStatus}");
}
# Write out the last block
if (defined($hOut))
{
$self->block_write($hOut, \$strBlock, undef, $bProtocol);
$iCompressedBufferSize = length($tCompressedBuffer);
if ($iCompressedBufferSize > 0)
{
$self->block_write($hOut, \$tCompressedBuffer, $iCompressedBufferSize, $bProtocol);
}
$self->block_write($hOut, undef, 0, $bProtocol);
}
last;
}
# Get total uncompressed bytes written
$iFileSize = $oZLib->total_in();
}
# If source is already compressed or transfer is not compressed then just read the stream
else
{
$iBlockIn = $self->stream_read($hIn, \$strBlock, $iBlockSize);
my $iBlockSize;
my $tBuffer;
if ($iBlockIn > 0)
# Read input
do
{
$self->block_write($hOut, \$strBlock, $iBlockIn, $bProtocol);
$iBlockSize = $self->stream_read($hIn, \$tBuffer, $self->{iBlockSize});
# Write a block if size > 0
if ($iBlockSize > 0)
{
$self->block_write($hOut, \$tBuffer, $iBlockSize, $bProtocol);
}
else
{
}
while ($iBlockSize > 0);
# Write 0 block to indicate end of stream
$self->block_write($hOut, undef, 0, $bProtocol);
last;
}
}
}
}