diff --git a/lib/BackRest/Remote.pm b/lib/BackRest/Remote.pm index 317468578..1e513010d 100644 --- a/lib/BackRest/Remote.pm +++ b/lib/BackRest/Remote.pm @@ -13,7 +13,8 @@ use Net::OpenSSH; use File::Basename; use POSIX ':sys_wait_h'; use Scalar::Util 'blessed'; -use IO::Compress::Gzip qw(gzip $GzipError); +use IO::Compress::Gzip qw($GzipError); +use IO::Uncompress::Gunzip qw($GunzipError); use lib dirname($0) . '/../lib'; use BackRest::Exception; @@ -37,7 +38,7 @@ use constant #################################################################################################################################### use constant { - DEFAULT_BLOCK_SIZE => 1048576 + DEFAULT_BLOCK_SIZE => 8388606 }; #################################################################################################################################### @@ -384,6 +385,7 @@ sub binary_xfer my $strBlock; my $strBlockBuffer; my $oGzip; + my $bFirst = true; # Both the in and out streams must be defined if (!defined($hIn) || !defined($hOut)) @@ -391,23 +393,50 @@ sub binary_xfer confess &log(ASSERT, 'hIn or hOut is not defined'); } - # If this is output and the source is not already compressed - if ($strRemote eq 'out' && !$bSourceCompressed) - { - $oGzip = new IO::Compress::Gzip(\$strBlock, Append => 1); - } - # If this is input and the destination should be uncompressed - elsif ($strRemote eq 'in' && !$bDestinationCompress) - { - # Open Unzip - TBD - } - while (1) { # Read from the protocol stream if ($strRemote eq 'in') { - # Read the block header (at start or when the previous block is complete) + if (!$bDestinationCompress && $bFirst) + { + $strBlockHeader = $self->read_line($hIn); + + if ($strBlockHeader !~ /^block [0-9]+$/) + { + $self->wait_pid(); + confess "unable to read block header ${strBlockHeader}"; + } + + # Parse the block size + $iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1)); + + if ($iBlockSize != 10) + { + confess &log(ERROR, 'gzip header length should be 10 bytes'); + } + + $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize); + + &log(INFO, "header block buffer in = ${iBlockBufferIn}"); + + if (!defined($iBlockBufferIn)) + { + my $strError = $!; + + $self->wait_pid(); + confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . + (defined($strError) ? ": ${strError}" : ''); + } + + $oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Transparent => 0) + or confess "IO::Uncompress::Gunzip failed: $GunzipError"; + + $bFirst = false; + $iBlockSize = $self->{iBlockSize}; + } + + # Read the block header (at start or after the previous block is complete) if ($iBlockInTotal == $iBlockSize) { $strBlockHeader = $self->read_line($hIn); @@ -422,24 +451,56 @@ sub binary_xfer $iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1)); $iBlockTotal += 1; $iBlockInTotal = 0; + + &log(INFO, "block size = ${iBlockSize}"); } - # If the block size > 0 then read from protocol stream. The entire block may not have been read before so keep reading - # until we get it all. + # If block size > 0 then read from protocol stream. The entire block may not be read on the first try so keep + # reading until we get it all. if ($iBlockSize != 0) { - $iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal); - - if (!defined($iBlockIn)) + if ($strRemote eq 'in' && !$bDestinationCompress) { - my $strError = $!; + $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize - $iBlockInTotal); - $self->wait_pid(); - confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . - (defined($strError) ? ": ${strError}" : ''); + &log(INFO, "block buffer in = ${iBlockBufferIn}"); + + if (!defined($iBlockBufferIn)) + { + my $strError = $!; + + $self->wait_pid(); + confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . + (defined($strError) ? ": ${strError}" : ''); + } + + $iBlockInTotal += $iBlockBufferIn; + + # Decompress the block + $iBlockIn = $oGzip->read($strBlock); + + if ($iBlockIn < 0) + { + confess &log(ERROR, "unable to decompress stream: ${GunzipError}"); + } + +# &log(INFO, "block in = ${iBlockIn} - " . length($strBlock)); } + else + { + $iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal); - $iBlockInTotal += $iBlockIn; + if (!defined($iBlockIn)) + { + my $strError = $!; + + $self->wait_pid(); + confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . + (defined($strError) ? ": ${strError}" : ''); + } + + $iBlockInTotal += $iBlockIn; + } } # Else indicate the stream is complete else @@ -450,60 +511,83 @@ sub binary_xfer # Read from file input stream else { - # Read a block from the input stream - $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize); - - if (!defined($iBlockBufferIn)) + # If source is not already compressed then compress it + if ($strRemote eq 'out' && !$bSourceCompressed) { - $self->wait_pid(); - confess &log(ERROR, 'unable to read'); + # Write the header + if ($bFirst) + { + $bFirst = false; + + $oGzip = new IO::Compress::Gzip(\$strBlock, Append => 0) + or confess "IO::Compress::Gzip failed: $GzipError"; + + if (!defined($strBlock)) + { + confess &log(ERROR, "gzip header block does not exist"); + } + + $iBlockIn = length($strBlock); + } + else + { + # Read a block from the stream + $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize); + + if (!defined($iBlockBufferIn)) + { + $self->wait_pid(); + confess &log(ERROR, 'unable to read'); + } + + # If block size > 0 then compress + if ($iBlockBufferIn > 0) + { + + $iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn); + + if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn) + { + $self->wait_pid(); + confess &log(ERROR, 'unable to read'); + } + } + + # If there was nothing new to compress then close + if (!defined($strBlock)) + { + $oGzip->close(); + } + + # If there is data in the compressed buffer, then output + if (defined($strBlock)) + { + $iBlockIn = length($strBlock); + } + # Else indicate that the stream is complete + else + { + $iBlockIn = 0; + } + } } - - # # Clear the compression output buffer - # undef($strBlock); - - # If new data was read from the input stream then compress - if ($iBlockBufferIn > 0) + # If source is already compressed or transfer is not compressed then just read the stream + else { - $iBlockIn = $oGzip->syswrite($strBlockBuffer, $iBlockBufferIn); + $iBlockIn = sysread($hIn, $strBlockBuffer, $iBlockSize); - if (!defined($iBlockIn) || $iBlockIn != $iBlockBufferIn) + if (!defined($iBlockIn)) { $self->wait_pid(); confess &log(ERROR, 'unable to read'); } } - - # If there was nothing new to compress then close - if (!defined($strBlock)) - { - $oGzip->close(); - } - - # If there is data in the compressed buffer, then output - if (defined($strBlock)) - { - $iBlockIn = length($strBlock); - } - # Else indicate that the stream is complete - else - { - $iBlockIn = 0; - } } # Write block header to the protocol stream if ($strRemote eq 'out') { - $strBlockHeader = "block ${iBlockIn}\n"; - - $iBlockOut = syswrite($hOut, $strBlockHeader); - - if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader)) - { - $self->wait_pid(); - confess 'unable to write block header'; - } + $self->write_line($hOut, "block ${iBlockIn}"); } # Write block to output stream @@ -525,8 +609,6 @@ sub binary_xfer last; } } - -# &log(INFO, 'got to end'); } ####################################################################################################################################