1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-05 15:05:48 +02:00

Working on decompression.

This commit is contained in:
David Steele 2015-02-26 19:12:22 -05:00
parent 64e1b1cea7
commit 2506bbb21a

View File

@ -13,7 +13,8 @@ use Net::OpenSSH;
use File::Basename;
use POSIX ':sys_wait_h';
use Scalar::Util 'blessed';
use IO::Compress::Gzip qw(gzip $GzipError);
use IO::Compress::Gzip qw($GzipError);
use IO::Uncompress::Gunzip qw($GunzipError);
use lib dirname($0) . '/../lib';
use BackRest::Exception;
@ -37,7 +38,7 @@ use constant
####################################################################################################################################
use constant
{
DEFAULT_BLOCK_SIZE => 1048576
DEFAULT_BLOCK_SIZE => 8388606
};
####################################################################################################################################
@ -384,6 +385,7 @@ sub binary_xfer
my $strBlock;
my $strBlockBuffer;
my $oGzip;
my $bFirst = true;
# Both the in and out streams must be defined
if (!defined($hIn) || !defined($hOut))
@ -391,23 +393,50 @@ sub binary_xfer
confess &log(ASSERT, 'hIn or hOut is not defined');
}
# If this is output and the source is not already compressed
if ($strRemote eq 'out' && !$bSourceCompressed)
{
$oGzip = new IO::Compress::Gzip(\$strBlock, Append => 1);
}
# If this is input and the destination should be uncompressed
elsif ($strRemote eq 'in' && !$bDestinationCompress)
{
# Open Unzip - TBD
}
while (1)
{
# Read from the protocol stream
if ($strRemote eq 'in')
{
# Read the block header (at start or when the previous block is complete)
if (!$bDestinationCompress && $bFirst)
{
$strBlockHeader = $self->read_line($hIn);
if ($strBlockHeader !~ /^block [0-9]+$/)
{
$self->wait_pid();
confess "unable to read block header ${strBlockHeader}";
}
# Parse the block size
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
if ($iBlockSize != 10)
{
confess &log(ERROR, 'gzip header length should be 10 bytes');
}
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
&log(INFO, "header block buffer in = ${iBlockBufferIn}");
if (!defined($iBlockBufferIn))
{
my $strError = $!;
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Transparent => 0)
or confess "IO::Uncompress::Gunzip failed: $GunzipError";
$bFirst = false;
$iBlockSize = $self->{iBlockSize};
}
# Read the block header (at start or after the previous block is complete)
if ($iBlockInTotal == $iBlockSize)
{
$strBlockHeader = $self->read_line($hIn);
@ -422,24 +451,56 @@ sub binary_xfer
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1));
$iBlockTotal += 1;
$iBlockInTotal = 0;
&log(INFO, "block size = ${iBlockSize}");
}
# If the block size > 0 then read from protocol stream. The entire block may not have been read before so keep reading
# until we get it all.
# If block size > 0 then read from protocol stream. The entire block may not be read on the first try so keep
# reading until we get it all.
if ($iBlockSize != 0)
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal);
if (!defined($iBlockIn))
if ($strRemote eq 'in' && !$bDestinationCompress)
{
my $strError = $!;
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize - $iBlockInTotal);
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
&log(INFO, "block buffer in = ${iBlockBufferIn}");
if (!defined($iBlockBufferIn))
{
my $strError = $!;
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockInTotal += $iBlockBufferIn;
# Decompress the block
$iBlockIn = $oGzip->read($strBlock);
if ($iBlockIn < 0)
{
confess &log(ERROR, "unable to decompress stream: ${GunzipError}");
}
# &log(INFO, "block in = ${iBlockIn} - " . length($strBlock));
}
else
{
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal);
$iBlockInTotal += $iBlockIn;
if (!defined($iBlockIn))
{
my $strError = $!;
$self->wait_pid();
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
(defined($strError) ? ": ${strError}" : '');
}
$iBlockInTotal += $iBlockIn;
}
}
# Else indicate the stream is complete
else
@ -450,60 +511,83 @@ sub binary_xfer
# Read from file input stream
else
{
# Read a block from the input stream
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockBufferIn))
# If source is not already compressed then compress it
if ($strRemote eq 'out' && !$bSourceCompressed)
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
# Write the header
if ($bFirst)
{
$bFirst = false;
$oGzip = new IO::Compress::Gzip(\$strBlock, Append => 0)
or confess "IO::Compress::Gzip failed: $GzipError";
if (!defined($strBlock))
{
confess &log(ERROR, "gzip header block does not exist");
}
$iBlockIn = length($strBlock);
}
else
{
# Read a block from the stream
$iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockBufferIn))
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
# If block size > 0 then compress
if ($iBlockBufferIn > 0)
{
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
}
# If there was nothing new to compress then close
if (!defined($strBlock))
{
$oGzip->close();
}
# If there is data in the compressed buffer, then output
if (defined($strBlock))
{
$iBlockIn = length($strBlock);
}
# Else indicate that the stream is complete
else
{
$iBlockIn = 0;
}
}
}
# # Clear the compression output buffer
# undef($strBlock);
# If new data was read from the input stream then compress
if ($iBlockBufferIn > 0)
# If source is already compressed or transfer is not compressed then just read the stream
else
{
$iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn);
$iBlockIn = sysread($hIn, $strBlockBuffer, $iBlockSize);
if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn)
if (!defined($iBlockIn))
{
$self->wait_pid();
confess &log(ERROR, 'unable to read');
}
}
# If there was nothing new to compress then close
if (!defined($strBlock))
{
$oGzip->close();
}
# If there is data in the compressed buffer, then output
if (defined($strBlock))
{
$iBlockIn = length($strBlock);
}
# Else indicate that the stream is complete
else
{
$iBlockIn = 0;
}
}
# Write block header to the protocol stream
if ($strRemote eq 'out')
{
$strBlockHeader = "block ${iBlockIn}\n";
$iBlockOut = syswrite($hOut, $strBlockHeader);
if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader))
{
$self->wait_pid();
confess 'unable to write block header';
}
$self->write_line($hOut, "block ${iBlockIn}");
}
# Write block to output stream
@ -525,8 +609,6 @@ sub binary_xfer
last;
}
}
# &log(INFO, 'got to end');
}
####################################################################################################################################