You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-09-16 09:06:18 +02:00
Fixed binary_xfer() issue. Now seems to work in all cases.
This commit is contained in:
@@ -113,7 +113,9 @@ while ($strCommand ne OP_EXIT)
|
|||||||
{
|
{
|
||||||
$oFile->copy(PIPE_STDIN, undef,
|
$oFile->copy(PIPE_STDIN, undef,
|
||||||
PATH_ABSOLUTE, param_get(\%oParamHash, 'destination_file'),
|
PATH_ABSOLUTE, param_get(\%oParamHash, 'destination_file'),
|
||||||
undef, param_get(\%oParamHash, 'destination_compress'));
|
undef, param_get(\%oParamHash, 'destination_compress'),
|
||||||
|
undef, undef, undef,
|
||||||
|
param_get(\%oParamHash, 'destination_path_create'));
|
||||||
|
|
||||||
$oRemote->output_write();
|
$oRemote->output_write();
|
||||||
}
|
}
|
||||||
@@ -151,7 +153,9 @@ while ($strCommand ne OP_EXIT)
|
|||||||
PATH_ABSOLUTE, param_get(\%oParamHash, 'destination_file'),
|
PATH_ABSOLUTE, param_get(\%oParamHash, 'destination_file'),
|
||||||
param_get(\%oParamHash, 'source_compressed'),
|
param_get(\%oParamHash, 'source_compressed'),
|
||||||
param_get(\%oParamHash, 'destination_compress'),
|
param_get(\%oParamHash, 'destination_compress'),
|
||||||
param_get(\%oParamHash, 'ignore_missing_source', false)) ? 'Y' : 'N');
|
param_get(\%oParamHash, 'ignore_missing_source', false).
|
||||||
|
undef, undef,
|
||||||
|
param_get(\%oParamHash, 'destination_path_create')) ? 'Y' : 'N');
|
||||||
}
|
}
|
||||||
elsif ($strCommand eq OP_FILE_MANIFEST)
|
elsif ($strCommand eq OP_FILE_MANIFEST)
|
||||||
{
|
{
|
||||||
|
@@ -153,10 +153,10 @@ sub backup_thread_complete
|
|||||||
$bConfessOnError = true;
|
$bConfessOnError = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!defined($iTimeout))
|
# if (!defined($iTimeout))
|
||||||
{
|
# {
|
||||||
&log(WARN, "no thread timeout was set");
|
# &log(WARN, "no thread timeout was set");
|
||||||
}
|
# }
|
||||||
|
|
||||||
# Wait for all threads to complete and handle errors
|
# Wait for all threads to complete and handle errors
|
||||||
my $iThreadComplete = 0;
|
my $iThreadComplete = 0;
|
||||||
|
@@ -1119,9 +1119,9 @@ sub copy
|
|||||||
|
|
||||||
# Set debug string and log
|
# Set debug string and log
|
||||||
my $strDebug = ($bSourceRemote ? " remote" : " local") . " ${strSourcePathType}" .
|
my $strDebug = ($bSourceRemote ? " remote" : " local") . " ${strSourcePathType}" .
|
||||||
(defined($strSourceOp) ? ":${strSourceFile}" : "") .
|
(defined($strSourceFile) ? ":${strSourceOp}" : "") .
|
||||||
" to" . ($bDestinationRemote ? " remote" : " local") . " ${strDestinationPathType}" .
|
" to" . ($bDestinationRemote ? " remote" : " local") . " ${strDestinationPathType}" .
|
||||||
(defined($strDestinationOp) ? ":${strDestinationFile}" : "") .
|
(defined($strDestinationFile) ? ":${strDestinationOp}" : "") .
|
||||||
", source_compressed = " . ($bSourceCompressed ? "true" : "false") .
|
", source_compressed = " . ($bSourceCompressed ? "true" : "false") .
|
||||||
", destination_compress = " . ($bDestinationCompress ? "true" : "false") .
|
", destination_compress = " . ($bDestinationCompress ? "true" : "false") .
|
||||||
", ignore_missing_source = " . ($bIgnoreMissingSource ? "true" : "false") .
|
", ignore_missing_source = " . ($bIgnoreMissingSource ? "true" : "false") .
|
||||||
@@ -1244,6 +1244,7 @@ sub copy
|
|||||||
{
|
{
|
||||||
$oParamHash{destination_file} = $strDestinationOp;
|
$oParamHash{destination_file} = $strDestinationOp;
|
||||||
$oParamHash{destination_compress} = $bDestinationCompress;
|
$oParamHash{destination_compress} = $bDestinationCompress;
|
||||||
|
$oParamHash{destination_path_create} = $bDestinationPathCreate;
|
||||||
|
|
||||||
$hOut = $self->{oRemote}->{hIn};
|
$hOut = $self->{oRemote}->{hIn};
|
||||||
}
|
}
|
||||||
@@ -1257,6 +1258,7 @@ sub copy
|
|||||||
$oParamHash{source_compressed} = $bSourceCompressed;
|
$oParamHash{source_compressed} = $bSourceCompressed;
|
||||||
$oParamHash{destination_file} = $strDestinationOp;
|
$oParamHash{destination_file} = $strDestinationOp;
|
||||||
$oParamHash{destination_compress} = $bDestinationCompress;
|
$oParamHash{destination_compress} = $bDestinationCompress;
|
||||||
|
$oParamHash{destination_path_create} = $bDestinationPathCreate;
|
||||||
|
|
||||||
if ($bIgnoreMissingSource)
|
if ($bIgnoreMissingSource)
|
||||||
{
|
{
|
||||||
|
@@ -24,7 +24,7 @@ use BackRest::Utility;
|
|||||||
####################################################################################################################################
|
####################################################################################################################################
|
||||||
use constant
|
use constant
|
||||||
{
|
{
|
||||||
DEFAULT_BLOCK_SIZE => 1048576
|
DEFAULT_BLOCK_SIZE => 8192 #1048576
|
||||||
};
|
};
|
||||||
|
|
||||||
####################################################################################################################################
|
####################################################################################################################################
|
||||||
@@ -338,9 +338,12 @@ sub binary_xfer
|
|||||||
|
|
||||||
my $iBlockSize = $self->{iBlockSize};
|
my $iBlockSize = $self->{iBlockSize};
|
||||||
my $iBlockIn;
|
my $iBlockIn;
|
||||||
|
my $iBlockInTotal = $iBlockSize;
|
||||||
my $iBlockOut;
|
my $iBlockOut;
|
||||||
|
my $iBlockTotal = 0;
|
||||||
my $strBlockHeader;
|
my $strBlockHeader;
|
||||||
my $strBlock;
|
my $strBlock;
|
||||||
|
# my $strBlockMore;
|
||||||
my $oGzip;
|
my $oGzip;
|
||||||
my $hPipeIn;
|
my $hPipeIn;
|
||||||
my $hPipeOut;
|
my $hPipeOut;
|
||||||
@@ -408,24 +411,52 @@ sub binary_xfer
|
|||||||
{
|
{
|
||||||
if ($strRemote eq 'in')
|
if ($strRemote eq 'in')
|
||||||
{
|
{
|
||||||
|
if ($iBlockInTotal == $iBlockSize)
|
||||||
$strBlockHeader = $self->read_line($hIn);
|
|
||||||
|
|
||||||
if ($strBlockHeader !~ /^block [0-9]+$/)
|
|
||||||
{
|
{
|
||||||
confess "unable to read block header ${strBlockHeader}";
|
$strBlockHeader = $self->read_line($hIn);
|
||||||
|
|
||||||
|
if ($strBlockHeader !~ /^block [0-9]+$/)
|
||||||
|
{
|
||||||
|
$self->wait_pid();
|
||||||
|
confess "unable to read block header ${strBlockHeader}";
|
||||||
|
}
|
||||||
|
|
||||||
|
$iBlockInTotal = 0;
|
||||||
|
$iBlockTotal += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, " ") + 1));
|
$iBlockSize = trim(substr($strBlockHeader, index($strBlockHeader, " ") + 1));
|
||||||
|
|
||||||
if ($iBlockSize != 0)
|
if ($iBlockSize != 0)
|
||||||
{
|
{
|
||||||
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize);
|
# print "looking for a block of size"
|
||||||
|
|
||||||
if (!defined($iBlockIn) || $iBlockIn != $iBlockSize)
|
$iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal);
|
||||||
|
|
||||||
|
# while (defined($iBlockIn) && $iBlockIn != $iBlockSize)
|
||||||
|
# {
|
||||||
|
# $iBlockInMore = sysread($hIn, $strBlockMore, $iBlockSize - $iBlockIn);
|
||||||
|
#
|
||||||
|
# confess "able to read $iBlockInMore bytes after reading $iBlockIn bytes";
|
||||||
|
#
|
||||||
|
# if (!defined($iBlockInMore))
|
||||||
|
# {
|
||||||
|
# $iBlockIn = undef;
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# $iBlockIn += $iBlockInMore;
|
||||||
|
# $strBlock += $strBlockMore;
|
||||||
|
# }
|
||||||
|
|
||||||
|
if (!defined($iBlockIn))
|
||||||
{
|
{
|
||||||
confess "unable to read ${iBlockSize} bytes from remote" . (defined($!) ? ": " . $! : "");
|
$self->wait_pid();
|
||||||
|
confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" .
|
||||||
|
(defined($iBlockIn) ? " (only ${iBlockIn} bytes read)" : " (nothing read)") .
|
||||||
|
(defined($!) ? ": " . $! : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$iBlockInTotal += $iBlockIn;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -438,6 +469,7 @@ sub binary_xfer
|
|||||||
|
|
||||||
if (!defined($iBlockIn))
|
if (!defined($iBlockIn))
|
||||||
{
|
{
|
||||||
|
$self->wait_pid();
|
||||||
confess &log(ERROR, "unable to read");
|
confess &log(ERROR, "unable to read");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -450,6 +482,7 @@ sub binary_xfer
|
|||||||
|
|
||||||
if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader))
|
if (!defined($iBlockOut) || $iBlockOut != length($strBlockHeader))
|
||||||
{
|
{
|
||||||
|
$self->wait_pid();
|
||||||
confess "unable to write block header";
|
confess "unable to write block header";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -460,6 +493,7 @@ sub binary_xfer
|
|||||||
|
|
||||||
if (!defined($iBlockOut) || $iBlockOut != $iBlockIn)
|
if (!defined($iBlockOut) || $iBlockOut != $iBlockIn)
|
||||||
{
|
{
|
||||||
|
$self->wait_pid();
|
||||||
confess "unable to write ${iBlockIn} bytes" . (defined($!) ? ": " . $! : "");
|
confess "unable to write ${iBlockIn} bytes" . (defined($!) ? ": " . $! : "");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -129,8 +129,8 @@ sub BackRestTestBackup_Test
|
|||||||
|
|
||||||
BackRestTestBackup_Setup();
|
BackRestTestBackup_Setup();
|
||||||
|
|
||||||
BackRestTestCommon_ConfigCreate(BackRestTestCommon_DbPathGet() . '/pg_backrest.conf', REMOTE_DB);#, REMOTE_BACKUP);
|
BackRestTestCommon_ConfigCreate(BackRestTestCommon_DbPathGet() . '/pg_backrest.conf', REMOTE_DB, REMOTE_BACKUP);
|
||||||
BackRestTestCommon_ConfigCreate(BackRestTestCommon_BackupPathGet() . '/pg_backrest.conf', REMOTE_BACKUP);#, REMOTE_DB);
|
BackRestTestCommon_ConfigCreate(BackRestTestCommon_BackupPathGet() . '/pg_backrest.conf', REMOTE_BACKUP, REMOTE_DB);
|
||||||
|
|
||||||
BackRestTestCommon_Execute(BackRestTestCommon_CommandMainGet() . ' --config=' . BackRestTestCommon_BackupPathGet() .
|
BackRestTestCommon_Execute(BackRestTestCommon_CommandMainGet() . ' --config=' . BackRestTestCommon_BackupPathGet() .
|
||||||
"/pg_backrest.conf --type=incr --stanza=${strStanza} backup");
|
"/pg_backrest.conf --type=incr --stanza=${strStanza} backup");
|
||||||
|
@@ -133,6 +133,7 @@ sub BackRestTestCommon_ConfigCreate
|
|||||||
|
|
||||||
$oParamHash{$strCommonStanza}{'path'} = $strCommonDbCommonPath;
|
$oParamHash{$strCommonStanza}{'path'} = $strCommonDbCommonPath;
|
||||||
$oParamHash{'global:backup'}{'path'} = $strCommonBackupPath;
|
$oParamHash{'global:backup'}{'path'} = $strCommonBackupPath;
|
||||||
|
$oParamHash{'global:backup'}{'thread-max'} = '8';
|
||||||
|
|
||||||
$oParamHash{'global:log'}{'level-console'} = 'debug';
|
$oParamHash{'global:log'}{'level-console'} = 'debug';
|
||||||
$oParamHash{'global:log'}{'level-file'} = 'trace';
|
$oParamHash{'global:log'}{'level-file'} = 'trace';
|
||||||
|
Reference in New Issue
Block a user