1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-07-15 01:04:37 +02:00

Integrating IPC::Run - IPC::Open3 is out.

This commit is contained in:
David Steele
2014-06-30 18:35:05 -04:00
parent c300c841eb
commit d0ca69d433
4 changed files with 53 additions and 37 deletions

View File

@ -10,6 +10,8 @@ Simple Postgres Backup and Restore
## feature backlog ## feature backlog
* Fix backup.manifest to not be a binary dump.
* Move backups to be removed to temp before deleting. * Move backups to be removed to temp before deleting.
* Async archive-get. * Async archive-get.
@ -30,16 +32,17 @@ Simple Postgres Backup and Restore
## required perl modules ## required perl modules
* Config::IniFiles * IPC::Run
* Moose
* IPC::System::Simple
* Net::OpenSSH * Net::OpenSSH
* JSON
* IPC::Open3
* Digest::SHA * Digest::SHA
* IO::Compress::Gzip * IO::Compress::Gzip
* IO::Uncompress::Gunzip * IO::Uncompress::Gunzip
* JSON (Only useful with custom IniFile functions, currently not used)
* Config::IniFiles (Not very useful since it does not handle large hashes - should replace with custom functions)
* Moose (Not using many features here, just use standard Perl object syntax?)
* IPC::System::Simple (only used in DB object - should convert to IPC::Run)
## release notes ## release notes
### v0.30: ??? ### v0.30: ???

View File

@ -10,7 +10,6 @@ use Carp;
use Moose; use Moose;
use Net::OpenSSH; use Net::OpenSSH;
use IPC::Open3;
use File::Basename; use File::Basename;
use File::Copy qw(cp); use File::Copy qw(cp);
use File::Path qw(make_path remove_tree); use File::Path qw(make_path remove_tree);

View File

@ -26,7 +26,7 @@ use BackRest::Utility;
#################################################################################################################################### ####################################################################################################################################
use constant use constant
{ {
DEFAULT_BLOCK_SIZE => 8192 #1048576 DEFAULT_BLOCK_SIZE => 1048576
}; };
#################################################################################################################################### ####################################################################################################################################
@ -406,16 +406,19 @@ sub binary_xfer
my $bSourceCompressed = shift; my $bSourceCompressed = shift;
my $bDestinationCompress = shift; my $bDestinationCompress = shift;
# If no remote is defined then set to none
if (!defined($strRemote)) if (!defined($strRemote))
{ {
$strRemote = 'none'; $strRemote = 'none';
} }
# Only set compression defaults when remote is defined
else else
{ {
$bSourceCompressed = defined($bSourceCompressed) ? $bSourceCompressed : false; $bSourceCompressed = defined($bSourceCompressed) ? $bSourceCompressed : false;
$bDestinationCompress = defined($bDestinationCompress) ? $bDestinationCompress : false; $bDestinationCompress = defined($bDestinationCompress) ? $bDestinationCompress : false;
} }
# Working variables
my $iBlockSize = $self->{iBlockSize}; my $iBlockSize = $self->{iBlockSize};
my $iBlockIn; my $iBlockIn;
my $iBlockInTotal = $iBlockSize; my $iBlockInTotal = $iBlockSize;
@ -434,21 +437,28 @@ sub binary_xfer
confess &log(ASSERT, "hIn or hOut is not defined"); confess &log(ASSERT, "hIn or hOut is not defined");
} }
# !!! Convert this to a thread when there is time # If this is output and the source is not already compressed
# Spawn a child process to do compression
if ($strRemote eq "out" && !$bSourceCompressed) if ($strRemote eq "out" && !$bSourceCompressed)
{ {
# Increase the blocksize since we are compressing
$iBlockSize *= 4;
# Open the in/out pipes
pipe $hPipeOut, $hPipeIn; pipe $hPipeOut, $hPipeIn;
# Queue the compression job with the thread
$self->{oThreadQueue}->enqueue("compress:" . fileno($hIn) . ',' . fileno($hPipeIn)); $self->{oThreadQueue}->enqueue("compress:" . fileno($hIn) . ',' . fileno($hPipeIn));
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue(); my $strMessage = $self->{oThreadResult}->dequeue();
# Close input pipe so that thread has the only copy, reset hIn to hPipeOut
if ($strMessage eq 'running') if ($strMessage eq 'running')
{ {
close($hPipeIn); close($hPipeIn);
$hIn = $hPipeOut; $hIn = $hPipeOut;
} }
# If any other message is returned then error
else else
{ {
confess "unknown thread message $strMessage"; confess "unknown thread message $strMessage";
@ -457,39 +467,22 @@ sub binary_xfer
# Spawn a child process to do decompression # Spawn a child process to do decompression
elsif ($strRemote eq "in" && !$bDestinationCompress) elsif ($strRemote eq "in" && !$bDestinationCompress)
{ {
# pipe $hPipeOut, $hPipeIn; # Open the in/out pipes
#
# # fork and exit the parent process
# $pId = fork();
#
# if (!$pId)
# {
# close($hPipeIn);
#
# gunzip($hPipeOut => $hOut)
# or exit 1;
# #or die confess &log(ERROR, "unable to uncompress: " . $GunzipError);
#
# close($hPipeOut);
#
# exit 0;
# }
#
# close($hPipeOut);
#
# $hOut = $hPipeIn;
pipe $hPipeOut, $hPipeIn; pipe $hPipeOut, $hPipeIn;
# Queue the decompression job with the thread
$self->{oThreadQueue}->enqueue("decompress:" . fileno($hPipeOut) . ',' . fileno($hOut)); $self->{oThreadQueue}->enqueue("decompress:" . fileno($hPipeOut) . ',' . fileno($hOut));
# Wait for the thread to acknowledge that it has duplicated the file handles
my $strMessage = $self->{oThreadResult}->dequeue(); my $strMessage = $self->{oThreadResult}->dequeue();
# Close output pipe so that thread has the only copy, reset hOut to hPipeIn
if ($strMessage eq 'running') if ($strMessage eq 'running')
{ {
close($hPipeOut); close($hPipeOut);
$hOut = $hPipeIn; $hOut = $hPipeIn;
} }
# If any other message is returned then error
else else
{ {
confess "unknown thread message $strMessage"; confess "unknown thread message $strMessage";

View File

@ -15,11 +15,15 @@ use Carp;
use File::Basename; use File::Basename;
use Cwd 'abs_path'; use Cwd 'abs_path';
use Config::IniFiles; use Config::IniFiles;
use IPC::Run qw(run);
use lib dirname($0) . "/../lib"; use lib dirname($0) . "/../lib";
use BackRest::Utility; use BackRest::Utility;
use BackRest::File; use BackRest::File;
####################################################################################################################################
# Exports
####################################################################################################################################
use Exporter qw(import); use Exporter qw(import);
our @EXPORT = qw(BackRestTestCommon_Setup BackRestTestCommon_Execute BackRestTestCommon_ExecuteBackRest our @EXPORT = qw(BackRestTestCommon_Setup BackRestTestCommon_Execute BackRestTestCommon_ExecuteBackRest
BackRestTestCommon_ConfigCreate BackRestTestCommon_ConfigCreate
@ -29,6 +33,9 @@ our @EXPORT = qw(BackRestTestCommon_Setup BackRestTestCommon_Execute BackRestTes
BackRestTestCommon_ArchivePathGet BackRestTestCommon_DbPathGet BackRestTestCommon_DbCommonPathGet BackRestTestCommon_ArchivePathGet BackRestTestCommon_DbPathGet BackRestTestCommon_DbCommonPathGet
BackRestTestCommon_DbPortGet); BackRestTestCommon_DbPortGet);
####################################################################################################################################
# Module variables
####################################################################################################################################
my $strCommonStanza; my $strCommonStanza;
my $strCommonCommandMain; my $strCommonCommandMain;
my $strCommonCommandRemote; my $strCommonCommandRemote;
@ -49,26 +56,40 @@ my $iCommonDbPort;
#################################################################################################################################### ####################################################################################################################################
sub BackRestTestCommon_Execute sub BackRestTestCommon_Execute
{ {
my $strCommand = shift; my $strCommand = shift; # Command to execute
my $bRemote = shift; my $bRemote = shift; # Execute on remote? This will use the defined BackRest user
my $bSuppressError = shift; my $bSuppressError = shift; # Ignore any errors
# Set defaults # Set defaults
$bRemote = defined($bRemote) ? $bRemote : false; $bRemote = defined($bRemote) ? $bRemote : false;
$bSuppressError = defined($bSuppressError) ? $bSuppressError : false; $bSuppressError = defined($bSuppressError) ? $bSuppressError : false;
# If remote then run the command through ssh
if ($bRemote) if ($bRemote)
{ {
$strCommand = "ssh ${strCommonUserBackRest}\@${strCommonHost} '${strCommand}'"; $strCommand = "ssh ${strCommonUserBackRest}\@${strCommonHost} '${strCommand}'";
} }
if (system($strCommand) != 0) # Run the command
if (!run($strCommand))
{ {
if (!$bSuppressError) if ($bSuppressError)
{ {
confess &log(ERROR, "unable to execute command: ${strCommand}"); return;
} }
confess &log(ERROR, "command \"${strCommand}\" returned: " . $?);
} }
# # Wait for the process to finish and report any errors
# waitpid($pId, 0);
# my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8;
#
# if ($iExitStatus != 0)
# {
# confess &log(ERROR, "command \"${strCommand}\" returned " . $iExitStatus); # . ": " .
# # (defined($strError) ? $strError : "[unknown]"));
# }
} }
#################################################################################################################################### ####################################################################################################################################