1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-01-18 04:58:51 +02:00

ZLib stuff starting to look good. All references removed from File and using binary_xfer for all de/compression.

This commit is contained in:
David Steele 2015-03-01 13:41:35 -05:00
parent 7ede058b45
commit 77bc4238dc
2 changed files with 188 additions and 174 deletions

View File

@ -15,9 +15,6 @@ use File::Path qw(make_path remove_tree);
use Digest::SHA;
use File::stat;
use Fcntl ':mode';
use IO::Compress::Gzip qw(gzip $GzipError);
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use IO::String;
use lib dirname($0) . '/../lib';
use BackRest::Exception;
@ -130,6 +127,11 @@ sub new
$self->{oRemote} = $oRemote;
$self->{iThreadIdx} = $iThreadIdx;
if (!defined($self->{strRemote}) || $self->{strRemote} eq NONE)
{
$self->{oRemote} = new BackRest::Remote();
}
# If remote is defined check parameters and open session
if (defined($self->{strRemote}) && $self->{strRemote} ne NONE)
{
@ -523,25 +525,8 @@ sub compress
# Run locally
else
{
# Compress the file
if (!gzip($strPathOp => "${strPathOp}.gz"))
{
my $strError = "${strPathOp} could not be compressed:" . $!;
my $iErrorCode = COMMAND_ERR_FILE_READ;
if (!$self->exists($strPathType, $strFile))
{
$strError = "${strPathOp} does not exist";
$iErrorCode = COMMAND_ERR_FILE_MISSING;
}
if ($strPathType eq PATH_ABSOLUTE)
{
confess &log(ERROR, $strError, $iErrorCode);
}
confess &log(ERROR, "${strDebug}: " . $strError);
}
# Use copy to compress the file
$self->copy($strPathType, $strFile, $strPathType, "${strFile}.gz", false, true);
# Remove the old file
unlink($strPathOp)
@ -821,53 +806,8 @@ sub hash_size
if ($bCompressed)
{
my $bFirst = true;
my $tCompressedBuffer;
my $tUncompressedBuffer;
my $iBlockSize;
my $iBlockIn;
my $oGzip;
do
{
# Read a block from the file
$iBlockSize = sysread($hFile, $tCompressedBuffer, 4194304,
defined($tCompressedBuffer) ? length($tCompressedBuffer) : 0);
if (!defined($iBlockSize))
{
confess &log(ERROR, "${strFileOp} could not be read: " . $!);
}
# If this is the first block then initialize Gunzip
if ($bFirst)
{
# Initialize Gunzip
$oGzip = new IO::Uncompress::Gunzip(\$tCompressedBuffer, Transparent => 0, BlockSize => 4194304)
or confess "IO::Uncompress::Gunzip failed: $GunzipError";
# Clear first block flag
$bFirst = false;
}
# Loop while there is more data to uncompress
while (!$oGzip->eof())
{
# Decompress the block
$iBlockIn = $oGzip->read($tUncompressedBuffer);
if ($iBlockIn < 0)
{
confess &log(ERROR, "unable to decompress stream ($iBlockIn): ${GunzipError}");
}
$iSize += length($tUncompressedBuffer);
$oSHA->add($tUncompressedBuffer);
}
}
while ($iBlockSize > 0);
$oGzip->close();
($strHash, $iSize) =
$self->{oRemote}->binary_xfer($hFile, undef, 'in', undef, false, false);
}
else
{
@ -888,11 +828,11 @@ sub hash_size
$oSHA->add($tBuffer);
}
while ($iBlockSize > 0);
$strHash = $oSHA->hexdigest();
}
close($hFile);
$strHash = $oSHA->hexdigest();
}
return $strHash, $iSize;
@ -1596,13 +1536,21 @@ sub copy
{
my @stryToken = split(/ /, $strOutput);
if ($bDestinationRemote && ($stryToken[1] eq '?' || $stryToken[1] eq '?'))
if ($bDestinationRemote && ($stryToken[1] eq '?' || $stryToken[2] eq '?'))
{
confess &log(ERROR, "checksum/size should have been returned from remote: ${strOutput}");
}
$strChecksum = $stryToken[1];
$iFileSize = $stryToken[2];
if ($stryToken[1] ne '?')
{
$strChecksum = $stryToken[1];
$iFileSize = $stryToken[2];
}
if ($stryToken[2] ne '?')
{
$iFileSize = $stryToken[2];
}
}
}
else
@ -1637,14 +1585,16 @@ sub copy
# If the source is compressed and the destination is not then decompress
if ($bSourceCompressed && !$bDestinationCompress)
{
gunzip($hSourceFile => $hDestinationFile)
or die confess &log(ERROR, "${strDebug}: unable to uncompress: " . $GunzipError);
($strChecksum, $iFileSize) =
$self->{oRemote}->binary_xfer($hSourceFile, $hDestinationFile, 'in', undef, false, false);
}
# If the source is not compressed and the destination is then compress
elsif (!$bSourceCompressed && $bDestinationCompress)
{
gzip($hSourceFile => $hDestinationFile)
or die confess &log(ERROR, "${strDebug}: unable to compress: " . $GzipError);
($strChecksum, $iFileSize) =
$self->{oRemote}->binary_xfer($hSourceFile, $hDestinationFile, 'out', false, undef, false);
}
# Else straight copy
else
{
cp($hSourceFile, $hDestinationFile)
@ -1690,8 +1640,11 @@ sub copy
# Move the file from tmp to final destination
$self->move(PATH_ABSOLUTE, $strDestinationTmpOp, PATH_ABSOLUTE, $strDestinationOp, true);
# Get the checksum and size
($strChecksum, $iFileSize) = $self->hash_size(PATH_ABSOLUTE, $strDestinationOp, $bDestinationCompress);
# Get the checksum and size if they are not already set
if (!defined($strChecksum) || !defined($iFileSize))
{
($strChecksum, $iFileSize) = $self->hash_size(PATH_ABSOLUTE, $strDestinationOp, $bDestinationCompress);
}
}
return $bResult, $strChecksum, $iFileSize;

View File

@ -15,6 +15,7 @@ use POSIX ':sys_wait_h';
use Scalar::Util 'blessed';
use IO::Compress::Gzip qw($GzipError);
use IO::Uncompress::Gunzip qw($GunzipError);
use Compress::Raw::Zlib;
use lib dirname($0) . '/../lib';
use BackRest::Exception;
@ -357,50 +358,61 @@ sub block_read
my $self = shift;
my $hIn = shift;
my $strBlockRef = shift;
my $bProtocol = shift;
# Read the block header and make sure it's valid
my $strBlockHeader = $self->read_line($hIn);
my $iBlockSize;
if ($strBlockHeader !~ /^block -{0,1}[0-9]+$/)
if ($bProtocol)
{
$self->wait_pid();
confess "unable to read block header ${strBlockHeader}";
}
# Read the block header and make sure it's valid
my $strBlockHeader = $self->read_line($hIn);
# Get block size from the header
my $iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
if ($strBlockHeader !~ /^block -{0,1}[0-9]+$/)
{
$self->wait_pid();
confess "unable to read block header ${strBlockHeader}";
}
# If block size is 0 or an error code then undef the buffer
if ($iBlockSize <= 0)
{
undef($$strBlockRef);
# Get block size from the header
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
# If block size is 0 or an error code then undef the buffer
if ($iBlockSize <= 0)
{
undef($$strBlockRef);
}
# Else read the block
else
{
my $iBlockRead = 0;
my $iBlockIn = 0;
my $iOffset = defined($$strBlockRef) ? length($$strBlockRef) : 0;
# !!! Would be nice to modify this with a non-blocking read
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
# Read as many chunks as it takes to get the full block
while ($iBlockRead != $iBlockSize)
{
$iBlockIn = sysread($hIn, $$strBlockRef, $iBlockSize - $iBlockRead, $iBlockRead + $iOffset);
if (!defined($iBlockIn))
{
my $strError = $!;
$self->wait_pid();
confess "only read ${iBlockRead}/${iBlockSize} block bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockRead += $iBlockIn;
}
}
}
# Else read the block
else
{
my $iBlockRead = 0;
my $iBlockIn = 0;
my $iOffset = defined($$strBlockRef) ? length($$strBlockRef) : 0;
# !!! Would be nice to modify this with a non-blocking read
# http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm
# Read as many chunks as it takes to get the full block
while ($iBlockRead != $iBlockSize)
{
$iBlockIn = sysread($hIn, $$strBlockRef, $iBlockSize - $iBlockRead, $iBlockRead + $iOffset);
if (!defined($iBlockIn))
{
my $strError = $!;
$self->wait_pid();
confess "only read ${iBlockRead}/${iBlockSize} block bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockRead += $iBlockIn;
}
$iBlockSize = $self->stream_read($hIn, $strBlockRef, $self->{iBlockSize},
defined($$strBlockRef) ? length($$strBlockRef) : 0);
}
# Return the block size
@ -418,13 +430,16 @@ sub block_write
my $hOut = shift;
my $tBlockRef = shift;
my $iBlockSize = shift;
my $bProtocol = shift;
# If block size is not defined, get it from buffer length
$iBlockSize = defined($iBlockSize) ? $iBlockSize : length($$tBlockRef);
# Write block header to the protocol stream
$self->write_line($hOut, "block ${iBlockSize}");
# &log(INFO, "block ${iBlockSize}");
if ($bProtocol)
{
$self->write_line($hOut, "block ${iBlockSize}");
}
# Write block if size > 0
if ($iBlockSize > 0)
@ -500,6 +515,13 @@ sub binary_xfer
my $strRemote = shift;
my $bSourceCompressed = shift;
my $bDestinationCompress = shift;
my $bProtocol = shift;
# The input stream must be defined (output is optional)
if (!defined($hIn))
{
confess &log(ASSERT, 'hIn is not defined');
}
# If no remote is defined then set to none
if (!defined($strRemote))
@ -513,27 +535,23 @@ sub binary_xfer
$bDestinationCompress = defined($bDestinationCompress) ? $bDestinationCompress : false;
}
# Default protocol to true
$bProtocol = defined($bProtocol) ? $bProtocol : true;
# Working variables
my $iBlockSize = $self->{iBlockSize};
my $iBlockIn;
my $iBlockInTotal = $iBlockSize;
my $iBlockBufferIn;
my $iBlockOut;
my $iBlockTotal = 0;
my $strBlockHeader;
my $strBlock;
my $strBlockBuffer;
my $oGzip = undef;
my $oSHA = undef;
my $iFileSize = undef;
my $bFirst = true;
my $oGzip = undef;
my $oZLib = undef;
my $iZLibStatus;
my $oSHA = undef;
# Both the in and out streams must be defined
if (!defined($hIn) || !defined($hOut))
{
confess &log(ASSERT, 'hIn or hOut is not defined');
}
my $bFirst = true;
while (1)
{
@ -544,9 +562,9 @@ sub binary_xfer
if (!$bDestinationCompress)
{
# Read a block from the protocol stream
$iBlockSize = $self->block_read($hIn, \$strBlockBuffer);
$iBlockSize = $self->block_read($hIn, \$strBlockBuffer, $bProtocol);
# If block size = -1 it means an error happened on the remote we need to exit so it can be returned.
# If block size = -1 it means an error happened on the remote and we need to exit so it can be returned.
if ($iBlockSize == -1)
{
last;
@ -558,22 +576,30 @@ sub binary_xfer
$oSHA = Digest::SHA->new('sha1');
$iFileSize = 0;
if ($iBlockSize == 0)
{
&log(ASSERT, 'first protocol block is zero');
}
# Gunzip doesn't like to be initialized with just the header, so if the first block is 10 bytes then fetch
# another another block to make sure so is at least some payload.
if ($iBlockSize <= 10)
{
$iBlockSize = $self->block_read($hIn, \$strBlockBuffer);
}
# if ($iBlockSize == 0)
# {
# &log(ASSERT, 'first protocol block is zero');
# }
#
# # Gunzip doesn't like to be initialized with just the header, so if the first block is 10 bytes then fetch
# # another another block to make sure so is at least some payload.
# if ($iBlockSize <= 10)
# {
# $iBlockSize = $self->block_read($hIn, \$strBlockBuffer, $bProtocol);
# }
# Initialize Gunzip
$oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Append => 1, Transparent => 0,
BlockSize => $self->{iBlockSize})
or confess "IO::Uncompress::Gunzip failed (${iBlockSize}): $GunzipError";
($oZLib, $iZLibStatus) = new Compress::Raw::Zlib::Inflate(LimitOutput => 1, -WindowBits => WANT_GZIP,
-Bufsize => $self->{iBlockSize});
if ($iZLibStatus != Z_OK)
{
confess &log(ERROR, "unable create a inflation stream: ${iZLibStatus}");
}
# $oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Append => 1, Transparent => 0,
# BlockSize => $self->{iBlockSize})
# or confess "IO::Uncompress::Gunzip failed (${iBlockSize}): $GunzipError";
# Clear first block flag
$bFirst = false;
@ -582,36 +608,67 @@ sub binary_xfer
# If the block contains data, decompress it
if ($iBlockSize > 0)
{
my $iUncompressedTotal = 0;
# my $iUncompressedTotal = 0;
# Loop while there is more data to uncompress
while (!$oGzip->eof())
# # Loop while there is more data to uncompress
# while (!$oGzip->eof())
# {
# # Decompress the block
# $iBlockIn = $oGzip->read($strBlock);
#
# if ($iBlockIn < 0)
# {
# confess &log(ERROR, "unable to decompress stream ($iBlockIn): ${GunzipError}");
# }
#
# $iUncompressedTotal += $iBlockIn;
# }
do
{
# Decompress the block
$iBlockIn = $oGzip->read($strBlock);
$iZLibStatus = $oZLib->inflate($strBlockBuffer, $strBlock);
if ($iBlockIn < 0)
if ($iZLibStatus == Z_OK || $iZLibStatus == Z_BUF_ERROR || $iZLibStatus == Z_STREAM_END)
{
confess &log(ERROR, "unable to decompress stream ($iBlockIn): ${GunzipError}");
$iFileSize += length($strBlock);
$oSHA->add($strBlock);
if (defined($hOut))
{
$self->stream_write($hOut, \$strBlock);
}
}
else
{
$iBlockSize = 0;
last;
}
$iUncompressedTotal += $iBlockIn;
}
while ($iZLibStatus == Z_OK && length($strBlock));
# Write out the uncompressed bytes if there are any
if ($iUncompressedTotal > 0)
{
$oSHA->add($strBlock);
$iFileSize += $iUncompressedTotal;
$self->stream_write($hOut, \$strBlock, $iUncompressedTotal);
undef($strBlock);
}
# # Write out the uncompressed bytes if there are any
# if ($iUncompressedTotal > 0)
# {
# $oSHA->add($strBlock);
# $iFileSize += $iUncompressedTotal;
#
# if (defined($hOut))
# {
# $self->stream_write($hOut, \$strBlock, $iUncompressedTotal);
# }
#
# undef($strBlock);
# }
}
# Else close gzip
else
# Make sure the decompression succeeded
if ($iBlockSize == 0)
{
$iBlockIn = $oGzip->close();
if ($iZLibStatus != Z_STREAM_END)
{
confess &log(ERROR, "unable to inflate stream: gzip returned ${iZLibStatus}");
}
last;
}
}
@ -619,7 +676,7 @@ sub binary_xfer
else
{
# Read a block from the protocol stream
$iBlockSize = $self->block_read($hIn, \$strBlock);
$iBlockSize = $self->block_read($hIn, \$strBlock, $bProtocol);
# If the block contains data, write it
if ($iBlockSize > 0)
@ -669,9 +726,9 @@ sub binary_xfer
confess &log(ERROR, "IO::Compress::Gzip failed: $GzipError");
}
if (defined($strBlock) && length($strBlock) > $self->{iBlockSize})
if (defined($hOut) && defined($strBlock) && length($strBlock) > $self->{iBlockSize})
{
$self->block_write($hOut, \$strBlock);
$self->block_write($hOut, \$strBlock, undef, $bProtocol);
undef($strBlock);
}
}
@ -680,8 +737,12 @@ sub binary_xfer
{
$oGzip->close();
$self->block_write($hOut, \$strBlock);
$self->block_write($hOut, undef, 0);
if (defined($hOut))
{
$self->block_write($hOut, \$strBlock, undef, $bProtocol);
$self->block_write($hOut, undef, 0, $bProtocol);
}
last;
}
}
@ -692,11 +753,11 @@ sub binary_xfer
if ($iBlockIn > 0)
{
$self->block_write($hOut, \$strBlock, $iBlockIn);
$self->block_write($hOut, \$strBlock, $iBlockIn, $bProtocol);
}
else
{
$self->block_write($hOut, undef, 0);
$self->block_write($hOut, undef, 0, $bProtocol);
last;
}
}