You've already forked pgbackrest
							
							
				mirror of
				https://github.com/pgbackrest/pgbackrest.git
				synced 2025-10-30 23:37:45 +02:00 
			
		
		
		
	binary_xfer compress/decompression working without threads. All unit tests passing. Hooray.
This commit is contained in:
		| @@ -380,14 +380,15 @@ sub block_read | ||||
|     { | ||||
|         my $iBlockRead = 0; | ||||
|         my $iBlockIn = 0; | ||||
|         my $iOffset = defined($$strBlockRef) ? length($$strBlockRef) : 0; | ||||
|  | ||||
|         # !!! Would be nice to modify this with a non-blocking read | ||||
|         # http://docstore.mik.ua/orelly/perl/cookbook/ch07_15.htm | ||||
|  | ||||
|         # Read as many chucks as it takes to get the full block | ||||
|         # Read as many chunks as it takes to get the full block | ||||
|         while ($iBlockRead != $iBlockSize) | ||||
|         { | ||||
|             $iBlockIn = sysread($hIn, $$strBlockRef, $iBlockSize - $iBlockRead, $iBlockRead); | ||||
|             $iBlockIn = sysread($hIn, $$strBlockRef, $iBlockSize - $iBlockRead, $iBlockRead + $iOffset); | ||||
|  | ||||
|             if (!defined($iBlockIn)) | ||||
|             { | ||||
| @@ -423,6 +424,7 @@ sub block_write | ||||
|  | ||||
|     # Write block header to the protocol stream | ||||
|     $self->write_line($hOut, "block ${iBlockSize}"); | ||||
| #    &log(INFO, "block ${iBlockSize}"); | ||||
|  | ||||
|     # Write block if size > 0 | ||||
|     if ($iBlockSize > 0) | ||||
| @@ -534,81 +536,82 @@ sub binary_xfer | ||||
|         # Read from the protocol stream | ||||
|         if ($strRemote eq 'in') | ||||
|         { | ||||
|             if (!$bDestinationCompress && $bFirst) | ||||
|             { | ||||
|                 $strBlockHeader = $self->read_line($hIn); | ||||
|  | ||||
|                 if ($strBlockHeader !~ /^block [0-9]+$/) | ||||
|                 { | ||||
|                     $self->wait_pid(); | ||||
|                     confess "unable to read block header ${strBlockHeader}"; | ||||
|                 } | ||||
|  | ||||
|                 # Parse the block size | ||||
|                 $iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, ' ') + 1)); | ||||
|  | ||||
|                 if ($iBlockSize != 10) | ||||
|                 { | ||||
|                     confess &log(ERROR, 'gzip header length should be 10 bytes'); | ||||
|                 } | ||||
|  | ||||
|                 $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize); | ||||
|  | ||||
|                 &log(INFO, "header block buffer in = ${iBlockBufferIn}"); | ||||
|  | ||||
|                 if (!defined($iBlockBufferIn)) | ||||
|                 { | ||||
|                     my $strError = $!; | ||||
|  | ||||
|                     $self->wait_pid(); | ||||
|                     confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . | ||||
|                             (defined($strError) ? ": ${strError}" : ''); | ||||
|                 } | ||||
|  | ||||
|                 $oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Transparent => 0) | ||||
|                     or confess "IO::Uncompress::Gunzip failed: $GunzipError"; | ||||
|  | ||||
|                 $bFirst = false; | ||||
|                 $iBlockSize = $self->{iBlockSize}; | ||||
|             } | ||||
|  | ||||
|             # Read block from protocol stream | ||||
|             $iBlockSize = $self->block_read($hIn, \$strBlock); | ||||
|  | ||||
|             # If the destination should not be compressed then decompress | ||||
|             if (!$bDestinationCompress) | ||||
|             { | ||||
|                 # $iBlockBufferIn = sysread($hIn, $strBlockBuffer, $iBlockSize - $iBlockInTotal); | ||||
|                 # | ||||
|                 # &log(INFO, "block buffer in = ${iBlockBufferIn}"); | ||||
|                 # | ||||
|                 # if (!defined($iBlockBufferIn)) | ||||
|                 # { | ||||
|                 #     my $strError = $!; | ||||
|                 # | ||||
|                 #     $self->wait_pid(); | ||||
|                 #     confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . | ||||
|                 #             (defined($strError) ? ": ${strError}" : ''); | ||||
|                 # } | ||||
|                 # | ||||
|                 # $iBlockInTotal += $iBlockBufferIn; | ||||
|                 # | ||||
|                 # # Decompress the block | ||||
|                 # $iBlockIn = $oGzip->read($strBlock); | ||||
|                 # | ||||
|                 # if ($iBlockIn < 0) | ||||
|                 # { | ||||
|                 #     confess &log(ERROR, "unable to decompress stream: ${GunzipError}"); | ||||
|                 # } | ||||
|                 # Read a block from the protocol stream | ||||
|                 $iBlockSize = $self->block_read($hIn, \$strBlockBuffer); | ||||
|  | ||||
| #                    &log(INFO, "block in = ${iBlockIn} - " . length($strBlock)); | ||||
|                 # If this is the first block then initialize Gunzip | ||||
|                 if ($bFirst) | ||||
|                 { | ||||
|                     # Gunzip doesn't like to be initialized with just the header, so if the first block is 10 bytes then fetch | ||||
|                     # another another block to make sure so is at least some payload. | ||||
|                     if ($iBlockSize == 10) | ||||
|                     { | ||||
|                         $iBlockSize = $self->block_read($hIn, \$strBlockBuffer); | ||||
|                     } | ||||
|  | ||||
|                     # Initialize Gunzip | ||||
|                     $oGzip = new IO::Uncompress::Gunzip(\$strBlockBuffer, Append => 1, Transparent => 0, | ||||
|                                                                           BlockSize => $self->{iBlockSize}) | ||||
|                         or confess "IO::Uncompress::Gunzip failed: $GunzipError"; | ||||
|  | ||||
|                     # Clear first block flag | ||||
|                     $bFirst = false; | ||||
|                 } | ||||
|  | ||||
|                 # If the block contains data, decompress it | ||||
|                 if ($iBlockSize > 0) | ||||
|                 { | ||||
|                     my $iUncompressedTotal = 0; | ||||
|  | ||||
|                     # Loop while there is more data to uncompress | ||||
|                     while (!$oGzip->eof()) | ||||
|                     { | ||||
|                         # Decompress the block | ||||
|                         $iBlockIn = $oGzip->read($strBlock); | ||||
|  | ||||
|                         if ($iBlockIn < 0) | ||||
|                         { | ||||
|                             confess &log(ERROR, "unable to decompress stream ($iBlockIn): ${GunzipError}"); | ||||
|                         } | ||||
|  | ||||
|                         $iUncompressedTotal += $iBlockIn; | ||||
|                     } | ||||
|  | ||||
|                     # Write out the uncompressed bytes if there are any | ||||
|                     if ($iUncompressedTotal > 0) | ||||
|                     { | ||||
|                         $self->stream_write($hOut, \$strBlock, $iUncompressedTotal); | ||||
|                         undef($strBlock); | ||||
|                     } | ||||
|                 } | ||||
|                 # Else close gzip | ||||
|                 else | ||||
|                 { | ||||
|                     $iBlockIn = $oGzip->close(); | ||||
|                     last; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             if ($iBlockSize == 0) | ||||
|             # If the destination should be compressed then just write out the already compressed stream | ||||
|             else | ||||
|             { | ||||
|                 last; | ||||
|             } | ||||
|                 # Read a block from the protocol stream | ||||
|                 $iBlockSize = $self->block_read($hIn, \$strBlock); | ||||
|  | ||||
|             $self->stream_write($hOut, \$strBlock, $iBlockSize); | ||||
|                 # If the block contains data, write it | ||||
|                 if ($iBlockSize > 0) | ||||
|                 { | ||||
|                     $self->stream_write($hOut, \$strBlock, $iBlockSize); | ||||
|                     undef($strBlock); | ||||
|                 } | ||||
|                 # Else done | ||||
|                 else | ||||
|                 { | ||||
|                     last; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         # Read from file input stream | ||||
|         else | ||||
| @@ -619,10 +622,10 @@ sub binary_xfer | ||||
|                 # Create the gzip object | ||||
|                 if ($bFirst) | ||||
|                 { | ||||
|                     $bFirst = false; | ||||
|  | ||||
|                     $oGzip = new IO::Compress::Gzip(\$strBlock, Append => 1) | ||||
|                         or confess "IO::Compress::Gzip failed: $GzipError"; | ||||
|  | ||||
|                     $bFirst = false; | ||||
|                 } | ||||
|  | ||||
|                 # Read a block from the stream | ||||
|   | ||||
		Reference in New Issue
	
	Block a user