You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-17 01:12:23 +02:00
pgBackRest is now pure C.
Remove embedded Perl from the distributed binary. This includes code, configure, Makefile, and packages. The distributed binary is now pure C. Remove storagePathEnforceSet() from the C Storage object which allowed Perl to write outside of the storage base directory. Update mock/all and real/all integration tests to use storageLocal() where they were violating this rule. Remove "c" option that allowed the remote to tell if it was being called from C or Perl. Code to convert options to JSON for passing to Perl (perl/config.c) has been moved to LibC since it is still required for Perl integration tests. Update build and installation instructions in the user guide. Remove all Perl unit tests. Remove obsolete Perl code. In particular this included all the Perl protocol code which required modifications to the Perl storage, manifest, and db objects that are still required for integration testing but only run locally. Any remaining Perl code is required for testing, documentation, or code generation. Rename perlReq to binReq in define.yaml to indicate that the binary is required for a test. This had been the actual meaning for quite some time but the key was never renamed.
This commit is contained in:
@ -1,293 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# Protocol Master Base
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Base::Master;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use Exporter qw(import);
|
||||
our @EXPORT = qw();
|
||||
use Time::HiRes qw(gettimeofday);
|
||||
use JSON::PP;
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Ini;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# Operation constants
|
||||
####################################################################################################################################
|
||||
use constant OP_NOOP => 'noop';
|
||||
push @EXPORT, qw(OP_NOOP);
|
||||
use constant OP_EXIT => 'exit';
|
||||
push @EXPORT, qw(OP_EXIT);
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Create the class hash
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
(
|
||||
my $strOperation,
|
||||
$self->{strName},
|
||||
$self->{strId},
|
||||
$self->{oIo},
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strName', trace => true},
|
||||
{name => 'strId', trace => true},
|
||||
{name => 'oIo', trace => true},
|
||||
);
|
||||
|
||||
# Create JSON object
|
||||
$self->{oJSON} = JSON::PP->new()->allow_nonref();
|
||||
|
||||
# Setup the keepalive timer
|
||||
$self->{fKeepAliveTimeout} = $self->io()->timeout() / 2 > 120 ? 120 : $self->io()->timeout() / 2;
|
||||
$self->{fKeepAliveTime} = gettimeofday();
|
||||
|
||||
# Set the error prefix used when raising error messages
|
||||
$self->{strErrorPrefix} = 'raised from ' . $self->{strId};
|
||||
|
||||
# Check greeting to be sure the protocol matches
|
||||
$self->greetingRead();
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# DESTROY
|
||||
####################################################################################################################################
|
||||
sub DESTROY
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
$self->close();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# greetingRead
|
||||
#
|
||||
# Read the greeting and make sure it is as expected.
|
||||
####################################################################################################################################
|
||||
sub greetingRead
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Get the first line of output from the remote if possible
|
||||
my $strGreeting = $self->io()->readLine(true);
|
||||
|
||||
# Parse the greeting and make sure it is valid
|
||||
my $hGreeting;
|
||||
|
||||
eval
|
||||
{
|
||||
$hGreeting = $self->{oJSON}->decode($strGreeting);
|
||||
|
||||
return true;
|
||||
}
|
||||
# Report any error that stopped parsing
|
||||
or do
|
||||
{
|
||||
$self->io()->error(ERROR_PROTOCOL, 'invalid protocol greeting', $strGreeting);
|
||||
};
|
||||
|
||||
# Error if greeting parameters do not match
|
||||
for my $hParam ({strName => 'name', strExpected => PROJECT_NAME},
|
||||
{strName => 'version', strExpected => PROJECT_VERSION},
|
||||
{strName => 'service', strExpected => $self->{strName}})
|
||||
{
|
||||
if (!defined($hGreeting->{$hParam->{strName}}) || $hGreeting->{$hParam->{strName}} ne $hParam->{strExpected})
|
||||
{
|
||||
confess &log(ERROR,
|
||||
'found name \'' . (defined($hGreeting->{$hParam->{strName}}) ? $hGreeting->{$hParam->{strName}} : '[undef]') .
|
||||
"' in protocol greeting instead of expected '$hParam->{strExpected}'", ERROR_HOST_CONNECT);
|
||||
}
|
||||
}
|
||||
|
||||
# Perform noop to catch errors early
|
||||
$self->noOp();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# outputRead
|
||||
#
|
||||
# Read output from the remote process.
|
||||
####################################################################################################################################
|
||||
sub outputRead
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$bOutputRequired,
|
||||
$bSuppressLog,
|
||||
$bWarnOnError,
|
||||
$bRef,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->outputRead', \@_,
|
||||
{name => 'bOutputRequired', default => false, trace => true},
|
||||
{name => 'bSuppressLog', required => false, trace => true},
|
||||
{name => 'bWarnOnError', default => false, trace => true},
|
||||
{name => 'bRef', default => false, trace => true},
|
||||
);
|
||||
|
||||
my $strProtocolResult = $self->io()->readLine();
|
||||
|
||||
logDebugMisc
|
||||
(
|
||||
$strOperation, undef,
|
||||
{name => 'strProtocolResult', value => $strProtocolResult, trace => true}
|
||||
);
|
||||
|
||||
my $hResult = $self->{oJSON}->decode($strProtocolResult);
|
||||
|
||||
# Raise any errors
|
||||
if (defined($hResult->{err}))
|
||||
{
|
||||
my $strError = $self->{strErrorPrefix} . (defined($hResult->{out}) ? ": $hResult->{out}" : '');
|
||||
|
||||
# Raise the error if a warning is not requested
|
||||
if (!$bWarnOnError)
|
||||
{
|
||||
confess &log(
|
||||
ERROR, $strError . (defined($hResult->{errStack}) ? "\n$hResult->{errStack}" : ''), $hResult->{err}, $bSuppressLog);
|
||||
}
|
||||
|
||||
&log(WARN, $strError, $hResult->{err});
|
||||
undef($hResult->{out});
|
||||
}
|
||||
|
||||
# Reset the keep alive time
|
||||
$self->{fKeepAliveTime} = gettimeofday();
|
||||
|
||||
# If output is required and there is no output, raise exception
|
||||
if ($bOutputRequired && !defined($hResult->{out}))
|
||||
{
|
||||
confess &log(ERROR, "$self->{strErrorPrefix}: output is not defined", ERROR_PROTOCOL_OUTPUT_REQUIRED);
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'hOutput', value => $hResult->{out}, ref => $bRef, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# cmdWrite
|
||||
#
|
||||
# Send command to remote process.
|
||||
####################################################################################################################################
|
||||
sub cmdWrite
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strCommand,
|
||||
$hParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->cmdWrite', \@_,
|
||||
{name => 'strCommand', trace => true},
|
||||
{name => 'hParam', required => false, trace => true},
|
||||
);
|
||||
|
||||
my $strProtocolCommand = $self->{oJSON}->encode({cmd => $strCommand, param => $hParam});
|
||||
|
||||
logDebugMisc
|
||||
(
|
||||
$strOperation, undef,
|
||||
{name => 'strProtocolCommand', value => $strProtocolCommand, trace => true}
|
||||
);
|
||||
|
||||
# Write out the command
|
||||
$self->io()->writeLine($strProtocolCommand);
|
||||
|
||||
# Reset the keep alive time
|
||||
$self->{fKeepAliveTime} = gettimeofday();
|
||||
|
||||
# Return from function and log return values if any
|
||||
logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# cmdExecute
|
||||
#
|
||||
# Send command to remote process and wait for output.
|
||||
####################################################################################################################################
|
||||
sub cmdExecute
|
||||
{
|
||||
my $self = shift;
|
||||
my $strCommand = shift;
|
||||
my $oParamRef = shift;
|
||||
my $bOutputRequired = shift;
|
||||
my $bWarnOnError = shift;
|
||||
|
||||
$self->cmdWrite($strCommand, $oParamRef);
|
||||
|
||||
return $self->outputRead($bOutputRequired, undef, $bWarnOnError);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# keepAlive
|
||||
#
|
||||
# Send periodic noops so the remote does not time out.
|
||||
####################################################################################################################################
|
||||
sub keepAlive
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if (gettimeofday() - $self->{fKeepAliveTimeout} > $self->{fKeepAliveTime})
|
||||
{
|
||||
$self->noOp();
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# noOp
|
||||
#
|
||||
# Send noop to test connection or keep it alive.
|
||||
####################################################################################################################################
|
||||
sub noOp
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
$self->cmdExecute(OP_NOOP, undef, false);
|
||||
$self->{fKeepAliveTime} = gettimeofday();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# Getters
|
||||
####################################################################################################################################
|
||||
sub io {shift->{oIo}}
|
||||
sub master {true}
|
||||
|
||||
1;
|
@ -1,247 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# Protocol Minion Base
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Base::Minion;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use Exporter qw(import);
|
||||
our @EXPORT = qw();
|
||||
use JSON::PP;
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Ini;
|
||||
use pgBackRest::Common::Lock;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Common::String;
|
||||
use pgBackRest::LibC qw(:lock);
|
||||
use pgBackRest::Protocol::Base::Master;
|
||||
use pgBackRest::Protocol::Helper;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# Constant used to define code to run after each operation
|
||||
####################################################################################################################################
|
||||
use constant OP_POST => 'post';
|
||||
push @EXPORT, qw(OP_POST);
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Create the class hash
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
(
|
||||
my $strOperation,
|
||||
$self->{strName},
|
||||
$self->{oIo},
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strName', trace => true},
|
||||
{name => 'oIo', trace => true},
|
||||
);
|
||||
|
||||
# Create JSON object
|
||||
$self->{oJSON} = JSON::PP->new()->allow_nonref();
|
||||
|
||||
# Write the greeting so master process knows who we are
|
||||
$self->greetingWrite();
|
||||
|
||||
# Initialize module variables
|
||||
$self->{hCommandMap} = $self->can('init') ? $self->init() : undef;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# greetingWrite
|
||||
#
|
||||
# Send a greeting to the master process.
|
||||
####################################################################################################################################
|
||||
sub greetingWrite
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Write the greeting
|
||||
$self->io()->writeLine((JSON::PP->new()->canonical()->allow_nonref())->encode(
|
||||
{name => PROJECT_NAME, service => $self->{strName}, version => PROJECT_VERSION}));
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# errorWrite
|
||||
#
|
||||
# Write errors with error codes in protocol format, otherwise write to stderr and exit with error.
|
||||
####################################################################################################################################
|
||||
sub errorWrite
|
||||
{
|
||||
my $self = shift;
|
||||
my $oException = shift;
|
||||
|
||||
# Throw hard error if this is not a standard exception
|
||||
if (!isException(\$oException))
|
||||
{
|
||||
confess &log(ERROR, 'unknown error: ' . $oException, ERROR_UNKNOWN);
|
||||
}
|
||||
|
||||
# Write error code and message
|
||||
$self->io()->writeLine($self->{oJSON}->encode({err => $oException->code(), out => $oException->message()}));
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# outputWrite
|
||||
#
|
||||
# Write output for the master process.
|
||||
####################################################################################################################################
|
||||
sub outputWrite
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
$self->io()->writeLine($self->{oJSON}->encode({out => \@_}));
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# cmdRead
|
||||
#
|
||||
# Read command sent by the master process.
|
||||
####################################################################################################################################
|
||||
sub cmdRead
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
my $hCommand = $self->{oJSON}->decode($self->io()->readLine());
|
||||
|
||||
return $hCommand->{cmd}, $hCommand->{param};
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# process
|
||||
####################################################################################################################################
|
||||
sub process
|
||||
{
|
||||
my $self = shift;
|
||||
my $strLockPath = shift;
|
||||
my $strLockCommand = shift;
|
||||
my $strLockStanza = shift;
|
||||
my $iProcessId = shift;
|
||||
|
||||
# Reset stderr log level so random errors do not get output
|
||||
logLevelSet(undef, undef, OFF);
|
||||
|
||||
# A permanent error will be returned from every command once it has been set. In some cases this is a more graceful way to
|
||||
# exit than a hard error.
|
||||
my $oPermanentError;
|
||||
|
||||
# Loop until the exit command is received
|
||||
eval
|
||||
{
|
||||
# Acquire a lock if required (this will be determined by lockAcquire()). This is done here so any errors will be
|
||||
# transmitted through the protocol layer and cause a graceful shutdown rather than a remote abort.
|
||||
if (defined($strLockPath) && defined($strLockStanza) && $iProcessId == 0)
|
||||
{
|
||||
eval
|
||||
{
|
||||
if (lockAcquire($strLockPath, $strLockCommand, $strLockStanza, 30, true))
|
||||
{
|
||||
# Check if processes have been stopped
|
||||
lockStopTest();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
or do
|
||||
{
|
||||
$oPermanentError = $EVAL_ERROR;
|
||||
};
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
my ($strCommand, $rParam) = $self->cmdRead();
|
||||
|
||||
last if ($strCommand eq OP_EXIT);
|
||||
|
||||
# If permanent error is set then always return it
|
||||
if (defined($oPermanentError))
|
||||
{
|
||||
$self->errorWrite($oPermanentError);
|
||||
}
|
||||
# Else process as usual
|
||||
else
|
||||
{
|
||||
eval
|
||||
{
|
||||
# Check for the command in the map and run it if found
|
||||
if (defined($self->{hCommandMap}{$strCommand}))
|
||||
{
|
||||
$self->outputWrite($self->{hCommandMap}{$strCommand}->($rParam));
|
||||
}
|
||||
# Run the standard NOOP command. This this can be overridden in hCommandMap to implement a custom NOOP.
|
||||
elsif ($strCommand eq OP_NOOP)
|
||||
{
|
||||
protocolKeepAlive();
|
||||
$self->outputWrite();
|
||||
}
|
||||
else
|
||||
{
|
||||
confess "invalid command: ${strCommand}";
|
||||
}
|
||||
|
||||
# Run the post command if defined
|
||||
if (defined($self->{hCommandMap}{&OP_POST}))
|
||||
{
|
||||
$self->{hCommandMap}{&OP_POST}->();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
# Process errors
|
||||
or do
|
||||
{
|
||||
$self->errorWrite($EVAL_ERROR);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
or do
|
||||
{
|
||||
my $oException = $EVAL_ERROR;
|
||||
|
||||
# Change log level so error will go to stderr
|
||||
logLevelSet(undef, undef, PROTOCOL);
|
||||
|
||||
# If standard exception
|
||||
if (isException(\$oException))
|
||||
{
|
||||
confess &log($oException->level(), $oException->message(), $oException->code());
|
||||
}
|
||||
|
||||
# Else unexpected Perl exception
|
||||
confess &log(ERROR, 'unknown error: ' . $oException, ERROR_UNKNOWN);
|
||||
};
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# Getters
|
||||
####################################################################################################################################
|
||||
sub io {shift->{oIo}}
|
||||
sub master {false}
|
||||
|
||||
1;
|
@ -1,148 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL COMMAND MASTER MODULE
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Command::Master;
|
||||
use parent 'pgBackRest::Protocol::Base::Master';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use File::Basename qw(dirname);
|
||||
use Time::HiRes qw(gettimeofday);
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Ini;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Common::Io::Process;
|
||||
use pgBackRest::Protocol::Base::Master;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strName, # Name of the protocol
|
||||
$strId, # Id of this process for error messages
|
||||
$strCommand, # Command to execute on local/remote
|
||||
$iBufferMax, # Maximum buffer size
|
||||
$iCompressLevel, # Set compression level
|
||||
$iCompressLevelNetwork, # Set compression level for network only compression
|
||||
$iProtocolTimeout, # Protocol timeout
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strName'},
|
||||
{name => 'strId'},
|
||||
{name => 'strCommand'},
|
||||
{name => 'iBufferMax'},
|
||||
{name => 'iCompressLevel'},
|
||||
{name => 'iCompressLevelNetwork'},
|
||||
{name => 'iProtocolTimeout'},
|
||||
);
|
||||
|
||||
# Set command
|
||||
if (!defined($strCommand))
|
||||
{
|
||||
confess &log(ASSERT, 'strCommand must be set');
|
||||
}
|
||||
|
||||
# Execute the command
|
||||
my $oIo = new pgBackRest::Common::Io::Process(
|
||||
new pgBackRest::Common::Io::Buffered(
|
||||
new pgBackRest::Common::Io::Handle($strId), $iProtocolTimeout, $iBufferMax), $strCommand);
|
||||
|
||||
# Create the class hash
|
||||
my $self = $class->SUPER::new($strName, $strId, $oIo);
|
||||
bless $self, $class;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# close
|
||||
####################################################################################################################################
|
||||
sub close
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$bComplete,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->close', \@_,
|
||||
{name => 'bComplete', default => false, trace => true},
|
||||
);
|
||||
|
||||
# Exit status defaults to success
|
||||
my $iExitStatus = 0;
|
||||
my $bClosed = false;
|
||||
|
||||
# Only send the exit command if the process is running
|
||||
if (defined($self->io()) && defined($self->io()->processId()))
|
||||
{
|
||||
&log(TRACE, "sending exit command to process");
|
||||
|
||||
eval
|
||||
{
|
||||
$self->cmdWrite(OP_EXIT);
|
||||
return true;
|
||||
}
|
||||
or do
|
||||
{
|
||||
my $oException = $EVAL_ERROR;
|
||||
my $strError = 'unable to shutdown protocol';
|
||||
my $strHint = 'HINT: the process completed all operations successfully but protocol-timeout may need to be increased.';
|
||||
|
||||
if (isException(\$oException))
|
||||
{
|
||||
$iExitStatus = $oException->code();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!defined($oException))
|
||||
{
|
||||
$oException = 'unknown error';
|
||||
}
|
||||
|
||||
$iExitStatus = ERROR_UNKNOWN;
|
||||
}
|
||||
|
||||
&log(WARN,
|
||||
$strError . ($iExitStatus == ERROR_UNKNOWN ? '' : sprintf(' [%03d]', $oException->code())) . ': ' .
|
||||
($iExitStatus == ERROR_UNKNOWN ? $oException : $oException->message()) .
|
||||
($bComplete ? "\n${strHint}" : ''));
|
||||
};
|
||||
|
||||
$self->{oIo}->close();
|
||||
undef($self->{oIo});
|
||||
$bClosed = true;
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'iExitStatus', value => $iExitStatus, trace => !$bClosed}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,62 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL COMMAND MINION MODULE
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Command::Minion;
|
||||
use parent 'pgBackRest::Protocol::Base::Minion';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use JSON::PP;
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Ini;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Common::String;
|
||||
use pgBackRest::Protocol::Base::Minion;
|
||||
use pgBackRest::Common::Io::Buffered;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strName, # Name of the protocol
|
||||
$iBufferMax, # Maximum buffer size
|
||||
$iProtocolTimeout, # Protocol timeout
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strName'},
|
||||
{name => 'iBufferMax'},
|
||||
{name => 'iProtocolTimeout'},
|
||||
);
|
||||
|
||||
# Open buffered protocol io
|
||||
my $oIo =
|
||||
new pgBackRest::Common::Io::Buffered(
|
||||
new pgBackRest::Common::Io::Handle('stdio', *STDIN, *STDOUT), $iProtocolTimeout, $iBufferMax);
|
||||
|
||||
# Create the class hash
|
||||
my $self = $class->SUPER::new($strName, $oIo);
|
||||
bless $self, $class;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,489 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# Create and manage protocol objects.
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Helper;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
|
||||
use Exporter qw(import);
|
||||
our @EXPORT = qw();
|
||||
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Protocol::Remote::Master;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# Operation constants
|
||||
####################################################################################################################################
|
||||
# Backup module
|
||||
use constant OP_BACKUP_FILE => 'backupFile';
|
||||
push @EXPORT, qw(OP_BACKUP_FILE);
|
||||
|
||||
# Archive Module
|
||||
use constant OP_ARCHIVE_GET_CHECK => 'archiveCheck';
|
||||
push @EXPORT, qw(OP_ARCHIVE_GET_CHECK);
|
||||
|
||||
# Db Module
|
||||
use constant OP_DB_CONNECT => 'dbConnect';
|
||||
push @EXPORT, qw(OP_DB_CONNECT);
|
||||
use constant OP_DB_EXECUTE_SQL => 'dbExecSql';
|
||||
push @EXPORT, qw(OP_DB_EXECUTE_SQL);
|
||||
use constant OP_DB_INFO => 'dbInfo';
|
||||
push @EXPORT, qw(OP_DB_INFO);
|
||||
|
||||
# Storage Module
|
||||
use constant OP_STORAGE_OPEN_READ => 'storageOpenRead';
|
||||
push @EXPORT, qw(OP_STORAGE_OPEN_READ);
|
||||
use constant OP_STORAGE_OPEN_WRITE => 'storageOpenWrite';
|
||||
push @EXPORT, qw(OP_STORAGE_OPEN_WRITE);
|
||||
use constant OP_STORAGE_CIPHER_PASS_USER => 'storageCipherPassUser';
|
||||
push @EXPORT, qw(OP_STORAGE_CIPHER_PASS_USER);
|
||||
use constant OP_STORAGE_EXISTS => 'storageExists';
|
||||
push @EXPORT, qw(OP_STORAGE_EXISTS);
|
||||
use constant OP_STORAGE_HASH_SIZE => 'storageHashSize';
|
||||
push @EXPORT, qw(OP_STORAGE_HASH_SIZE);
|
||||
use constant OP_STORAGE_LIST => 'storageList';
|
||||
push @EXPORT, qw(OP_STORAGE_LIST);
|
||||
use constant OP_STORAGE_MANIFEST => 'storageManifest';
|
||||
push @EXPORT, qw(OP_STORAGE_MANIFEST);
|
||||
use constant OP_STORAGE_MOVE => 'storageMove';
|
||||
push @EXPORT, qw(OP_STORAGE_MOVE);
|
||||
use constant OP_STORAGE_PATH_GET => 'storagePathGet';
|
||||
push @EXPORT, qw(OP_STORAGE_PATH_GET);
|
||||
|
||||
# Restore module
|
||||
use constant OP_RESTORE_FILE => 'restoreFile';
|
||||
push @EXPORT, qw(OP_RESTORE_FILE);
|
||||
|
||||
# Wait
|
||||
use constant OP_WAIT => 'wait';
|
||||
push @EXPORT, qw(OP_WAIT);
|
||||
|
||||
####################################################################################################################################
|
||||
# Module variables
|
||||
####################################################################################################################################
|
||||
my $hProtocol = {}; # Global remote hash that is created on first request
|
||||
|
||||
####################################################################################################################################
|
||||
# isRepoLocal
|
||||
#
|
||||
# Is the backup/archive repository local?
|
||||
####################################################################################################################################
|
||||
sub isRepoLocal
|
||||
{
|
||||
# Not valid for remote
|
||||
if (cfgCommandTest(CFGCMD_REMOTE) && !cfgOptionTest(CFGOPT_TYPE, CFGOPTVAL_REMOTE_TYPE_BACKUP))
|
||||
{
|
||||
confess &log(ASSERT, 'isRepoLocal() not valid on ' . cfgOption(CFGOPT_TYPE) . ' remote');
|
||||
}
|
||||
|
||||
return cfgOptionTest(CFGOPT_REPO_HOST) ? false : true;
|
||||
}
|
||||
|
||||
push @EXPORT, qw(isRepoLocal);
|
||||
|
||||
####################################################################################################################################
|
||||
# isDbLocal - is the database local?
|
||||
####################################################################################################################################
|
||||
sub isDbLocal
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iRemoteIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::isDbLocal', \@_,
|
||||
{name => 'iRemoteIdx', optional => true, default => cfgOptionValid(CFGOPT_HOST_ID) ? cfgOption(CFGOPT_HOST_ID) : 1,
|
||||
trace => true},
|
||||
);
|
||||
|
||||
# Not valid for remote
|
||||
if (cfgCommandTest(CFGCMD_REMOTE) && !cfgOptionTest(CFGOPT_TYPE, CFGOPTVAL_REMOTE_TYPE_DB))
|
||||
{
|
||||
confess &log(ASSERT, 'isDbLocal() not valid on ' . cfgOption(CFGOPT_TYPE) . ' remote');
|
||||
}
|
||||
|
||||
my $bLocal = cfgOptionTest(cfgOptionIdFromIndex(CFGOPT_PG_HOST, $iRemoteIdx)) ? false : true;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'bLocal', value => $bLocal, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
push @EXPORT, qw(isDbLocal);
|
||||
|
||||
####################################################################################################################################
|
||||
# Gets the parameters required to setup the protocol
|
||||
####################################################################################################################################
|
||||
sub protocolParam
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strCommand,
|
||||
$strRemoteType,
|
||||
$iRemoteIdx,
|
||||
$strBackRestBin,
|
||||
$iProcessIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::protocolParam', \@_,
|
||||
{name => 'strCommand'},
|
||||
{name => 'strRemoteType'},
|
||||
{name => 'iRemoteIdx', default => cfgOptionValid(CFGOPT_HOST_ID) ? cfgOption(CFGOPT_HOST_ID) : 1},
|
||||
{name => 'strBackRestBin', optional => true},
|
||||
{name => 'iProcessIdx', optional => true,
|
||||
default => cfgOptionValid(CFGOPT_PROCESS) ? cfgOption(CFGOPT_PROCESS, false) : undef},
|
||||
);
|
||||
|
||||
# Return the remote when required
|
||||
my $iOptionIdCmd = CFGOPT_REPO_HOST_CMD;
|
||||
my $iOptionIdConfig = CFGOPT_REPO_HOST_CONFIG;
|
||||
my $iOptionIdConfigIncludePath = CFGOPT_REPO_HOST_CONFIG_INCLUDE_PATH;
|
||||
my $iOptionIdConfigPath = CFGOPT_REPO_HOST_CONFIG_PATH;
|
||||
my $iOptionIdHost = CFGOPT_REPO_HOST;
|
||||
my $iOptionIdUser = CFGOPT_REPO_HOST_USER;
|
||||
my $strOptionDbPath = undef;
|
||||
my $strOptionDbPort = undef;
|
||||
my $strOptionDbSocketPath = undef;
|
||||
my $strOptionSshPort = CFGOPT_REPO_HOST_PORT;
|
||||
|
||||
if ($strRemoteType eq CFGOPTVAL_REMOTE_TYPE_DB)
|
||||
{
|
||||
$iOptionIdCmd = cfgOptionIdFromIndex(CFGOPT_PG_HOST_CMD, $iRemoteIdx);
|
||||
$iOptionIdConfig = cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG, $iRemoteIdx);
|
||||
$iOptionIdConfigIncludePath = cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG_INCLUDE_PATH, $iRemoteIdx);
|
||||
$iOptionIdConfigPath = cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG_PATH, $iRemoteIdx);
|
||||
$iOptionIdHost = cfgOptionIdFromIndex(CFGOPT_PG_HOST, $iRemoteIdx);
|
||||
$iOptionIdUser = cfgOptionIdFromIndex(CFGOPT_PG_HOST_USER, $iRemoteIdx);
|
||||
$strOptionSshPort = cfgOptionIdFromIndex(CFGOPT_PG_HOST_PORT, $iRemoteIdx);
|
||||
}
|
||||
|
||||
# Db path is not valid in all contexts (restore, for instance)
|
||||
if (cfgOptionValid(cfgOptionIdFromIndex(CFGOPT_PG_PATH, $iRemoteIdx)))
|
||||
{
|
||||
$strOptionDbPath =
|
||||
cfgOptionSource(cfgOptionIdFromIndex(CFGOPT_PG_PATH, $iRemoteIdx)) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption(cfgOptionIdFromIndex(CFGOPT_PG_PATH, $iRemoteIdx));
|
||||
}
|
||||
|
||||
# Db port is not valid in all contexts (restore, for instance)
|
||||
if (cfgOptionValid(cfgOptionIdFromIndex(CFGOPT_PG_PORT, $iRemoteIdx)))
|
||||
{
|
||||
$strOptionDbPort =
|
||||
cfgOptionSource(cfgOptionIdFromIndex(CFGOPT_PG_PORT, $iRemoteIdx)) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption(cfgOptionIdFromIndex(CFGOPT_PG_PORT, $iRemoteIdx));
|
||||
}
|
||||
|
||||
# Db socket is not valid in all contexts (restore, for instance)
|
||||
if (cfgOptionValid(cfgOptionIdFromIndex(CFGOPT_PG_SOCKET_PATH, $iRemoteIdx)))
|
||||
{
|
||||
$strOptionDbSocketPath =
|
||||
cfgOptionSource(cfgOptionIdFromIndex(CFGOPT_PG_SOCKET_PATH, $iRemoteIdx)) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption(cfgOptionIdFromIndex(CFGOPT_PG_SOCKET_PATH, $iRemoteIdx));
|
||||
}
|
||||
|
||||
# Build hash to set and override command options
|
||||
my $rhCommandOption =
|
||||
{
|
||||
&CFGOPT_COMMAND => {value => $strCommand},
|
||||
&CFGOPT_PROCESS => {value => defined($iProcessIdx) ? $iProcessIdx : 0},
|
||||
&CFGOPT_CONFIG =>
|
||||
{value => cfgOptionValid($iOptionIdConfig) && cfgOptionSource($iOptionIdConfig) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption($iOptionIdConfig)},
|
||||
&CFGOPT_CONFIG_INCLUDE_PATH =>
|
||||
{value => cfgOptionValid($iOptionIdConfigIncludePath) &&
|
||||
cfgOptionSource($iOptionIdConfigIncludePath) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption($iOptionIdConfigIncludePath)},
|
||||
&CFGOPT_CONFIG_PATH =>
|
||||
{value => cfgOptionValid($iOptionIdConfigPath) && cfgOptionSource($iOptionIdConfigPath) eq CFGDEF_SOURCE_DEFAULT ?
|
||||
undef : cfgOption($iOptionIdConfigPath)},
|
||||
&CFGOPT_TYPE => {value => $strRemoteType},
|
||||
&CFGOPT_LOG_PATH => {},
|
||||
&CFGOPT_LOCK_PATH => {},
|
||||
|
||||
# Only enable file logging on the remote when requested
|
||||
&CFGOPT_LOG_LEVEL_FILE => {value => cfgOption(CFGOPT_LOG_SUBPROCESS) ? cfgOption(CFGOPT_LOG_LEVEL_FILE) : lc(OFF)},
|
||||
|
||||
# Don't pass CFGOPT_LOG_LEVEL_STDERR because in the case of the local process calling the remote process the
|
||||
# option will be set to 'protocol' which is not a valid value from the command line.
|
||||
&CFGOPT_LOG_LEVEL_STDERR => {},
|
||||
|
||||
cfgOptionIdFromIndex(CFGOPT_PG_PATH, 1) => {value => $strOptionDbPath},
|
||||
cfgOptionIdFromIndex(CFGOPT_PG_PORT, 1) => {value => $strOptionDbPort},
|
||||
cfgOptionIdFromIndex(CFGOPT_PG_SOCKET_PATH, 1) => {value => $strOptionDbSocketPath},
|
||||
|
||||
# Set protocol options explicitly so values are not picked up from remote config files
|
||||
&CFGOPT_BUFFER_SIZE => {value => cfgOption(CFGOPT_BUFFER_SIZE)},
|
||||
&CFGOPT_COMPRESS_LEVEL => {value => cfgOption(CFGOPT_COMPRESS_LEVEL)},
|
||||
&CFGOPT_COMPRESS_LEVEL_NETWORK => {value => cfgOption(CFGOPT_COMPRESS_LEVEL_NETWORK)},
|
||||
&CFGOPT_PROTOCOL_TIMEOUT => {value => cfgOption(CFGOPT_PROTOCOL_TIMEOUT)}
|
||||
};
|
||||
|
||||
# Override some per-db options that shouldn't be passed to the command. ??? This could be done better as a new define
|
||||
# for these options which would then be implemented in cfgCommandWrite().
|
||||
for (my $iOptionIdx = 1; $iOptionIdx <= cfgOptionIndexTotal(CFGOPT_PG_HOST); $iOptionIdx++)
|
||||
{
|
||||
if ($iOptionIdx != 1)
|
||||
{
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG_INCLUDE_PATH, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_CONFIG_PATH, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_PATH, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_PORT, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_SOCKET_PATH, $iOptionIdx)} = {};
|
||||
}
|
||||
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_CMD, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_USER, $iOptionIdx)} = {};
|
||||
$rhCommandOption->{cfgOptionIdFromIndex(CFGOPT_PG_HOST_PORT, $iOptionIdx)} = {};
|
||||
}
|
||||
|
||||
# Generate the remote command
|
||||
my $strRemoteCommand = cfgCommandWrite(
|
||||
CFGCMD_REMOTE, true, defined($strBackRestBin) ? $strBackRestBin : cfgOption($iOptionIdCmd), undef, $rhCommandOption);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'strRemoteHost', value => cfgOption($iOptionIdHost)},
|
||||
{name => 'strRemoteHostUser', value => cfgOption($iOptionIdUser)},
|
||||
{name => 'strRemoteHostSshPort', value => cfgOption($strOptionSshPort, false)},
|
||||
{name => 'strRemoteCommand', value => $strRemoteCommand},
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# protocolGet
|
||||
#
|
||||
# Get the protocol object or create it if does not exist. Shared protocol objects are used because they create an SSH connection
|
||||
# to the remote host and the number of these connections should be minimized.
|
||||
####################################################################################################################################
|
||||
sub protocolGet
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strRemoteType,
|
||||
$iRemoteIdx,
|
||||
$bCache,
|
||||
$strBackRestBin,
|
||||
$iProcessIdx,
|
||||
$strCommand,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::protocolGet', \@_,
|
||||
{name => 'strRemoteType'},
|
||||
{name => 'iRemoteIdx', default => cfgOptionValid(CFGOPT_HOST_ID) ? cfgOption(CFGOPT_HOST_ID) : 1},
|
||||
{name => 'bCache', optional => true, default => true},
|
||||
{name => 'strBackRestBin', optional => true},
|
||||
{name => 'iProcessIdx', optional => true},
|
||||
{name => 'strCommand', optional => true,
|
||||
default => cfgOptionValid(CFGOPT_COMMAND) ? cfgOption(CFGOPT_COMMAND) : cfgCommandName(cfgCommandGet())},
|
||||
);
|
||||
|
||||
# Protocol object
|
||||
my $oProtocol;
|
||||
|
||||
# If no remote requested or if the requested remote type is local then return a local protocol object
|
||||
if (!cfgOptionTest(
|
||||
cfgOptionIdFromIndex($strRemoteType eq CFGOPTVAL_REMOTE_TYPE_DB ? CFGOPT_PG_HOST : CFGOPT_REPO_HOST, $iRemoteIdx)))
|
||||
{
|
||||
confess &log(ASSERT, 'protocol cannot be created when remote host is not specified');
|
||||
}
|
||||
# Else create the remote protocol
|
||||
else
|
||||
{
|
||||
# Set protocol to cached value
|
||||
$oProtocol =
|
||||
$bCache && defined($$hProtocol{$strRemoteType}{$iRemoteIdx}) ? $$hProtocol{$strRemoteType}{$iRemoteIdx} : undef;
|
||||
|
||||
if ($bCache && $$hProtocol{$strRemoteType}{$iRemoteIdx})
|
||||
{
|
||||
$oProtocol = $$hProtocol{$strRemoteType}{$iRemoteIdx};
|
||||
logDebugMisc($strOperation, 'found cached protocol');
|
||||
|
||||
# Issue a noop on protocol pulled from the cache to be sure it is still functioning. It's better to get an error at
|
||||
# request time than somewhere randomly later.
|
||||
$oProtocol->noOp();
|
||||
}
|
||||
|
||||
# If protocol was not returned from cache then create it
|
||||
if (!defined($oProtocol))
|
||||
{
|
||||
logDebugMisc($strOperation, 'create (' . ($bCache ? '' : 'un') . 'cached) remote protocol');
|
||||
|
||||
my ($strRemoteHost, $strRemoteHostUser, $strRemoteHostSshPort, $strRemoteCommand) = protocolParam(
|
||||
$strCommand, $strRemoteType, $iRemoteIdx, {strBackRestBin => $strBackRestBin, iProcessIdx => $iProcessIdx});
|
||||
|
||||
$oProtocol = new pgBackRest::Protocol::Remote::Master
|
||||
(
|
||||
cfgOption(CFGOPT_CMD_SSH),
|
||||
$strRemoteCommand,
|
||||
cfgOption(CFGOPT_BUFFER_SIZE),
|
||||
cfgOption(CFGOPT_COMPRESS_LEVEL),
|
||||
cfgOption(CFGOPT_COMPRESS_LEVEL_NETWORK),
|
||||
$strRemoteHost,
|
||||
$strRemoteHostUser,
|
||||
$strRemoteHostSshPort,
|
||||
cfgOption(CFGOPT_PROTOCOL_TIMEOUT)
|
||||
);
|
||||
|
||||
# Cache the protocol
|
||||
if ($bCache)
|
||||
{
|
||||
$$hProtocol{$strRemoteType}{$iRemoteIdx} = $oProtocol;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'oProtocol', value => $oProtocol, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
push @EXPORT, qw(protocolGet);
|
||||
|
||||
####################################################################################################################################
|
||||
# protocolList - list all active protocols
|
||||
####################################################################################################################################
|
||||
sub protocolList
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strRemoteType,
|
||||
$iRemoteIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::protocolList', \@_,
|
||||
{name => 'strRemoteType', required => false, trace => true},
|
||||
{name => 'iRemoteIdx', required => false, trace => true},
|
||||
);
|
||||
|
||||
my @oyProtocol;
|
||||
|
||||
if (!defined($strRemoteType))
|
||||
{
|
||||
foreach my $strRemoteType (sort(keys(%{$hProtocol})))
|
||||
{
|
||||
push(@oyProtocol, protocolList($strRemoteType));
|
||||
}
|
||||
}
|
||||
elsif (!defined($iRemoteIdx))
|
||||
{
|
||||
foreach my $iRemoteIdx (sort(keys(%{$hProtocol->{$strRemoteType}})))
|
||||
{
|
||||
push(@oyProtocol, protocolList($strRemoteType, $iRemoteIdx));
|
||||
}
|
||||
}
|
||||
elsif (defined($hProtocol->{$strRemoteType}{$iRemoteIdx}))
|
||||
{
|
||||
push(@oyProtocol, {strRemoteType => $strRemoteType, iRemoteIdx => $iRemoteIdx});
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'oyProtocol', value => \@oyProtocol, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# protocolDestroy
|
||||
#
|
||||
# Undefine the protocol if it is stored locally.
|
||||
####################################################################################################################################
|
||||
sub protocolDestroy
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strRemoteType,
|
||||
$iRemoteIdx,
|
||||
$bComplete,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::protocolDestroy', \@_,
|
||||
{name => 'strRemoteType', required => false},
|
||||
{name => 'iRemoteIdx', required => false},
|
||||
{name => 'bComplete', default => false},
|
||||
);
|
||||
|
||||
my $iExitStatus = 0;
|
||||
|
||||
foreach my $rhProtocol (protocolList($strRemoteType, $iRemoteIdx))
|
||||
{
|
||||
logDebugMisc(
|
||||
$strOperation, 'found cached protocol',
|
||||
{name => 'strRemoteType', value => $rhProtocol->{strRemoteType}},
|
||||
{name => 'iRemoteIdx', value => $rhProtocol->{iRemoteIdx}});
|
||||
|
||||
$iExitStatus = $hProtocol->{$rhProtocol->{strRemoteType}}{$rhProtocol->{iRemoteIdx}}->close($bComplete);
|
||||
delete($hProtocol->{$rhProtocol->{strRemoteType}}{$rhProtocol->{iRemoteIdx}});
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'iExitStatus', value => $iExitStatus}
|
||||
);
|
||||
}
|
||||
|
||||
push @EXPORT, qw(protocolDestroy);
|
||||
|
||||
####################################################################################################################################
|
||||
# protocolKeepAlive - call keepAlive() on all protocols
|
||||
####################################################################################################################################
|
||||
sub protocolKeepAlive
|
||||
{
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strRemoteType,
|
||||
$iRemoteIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::protocolDestroy', \@_,
|
||||
{name => 'strRemoteType', required => false, trace => true},
|
||||
{name => 'iRemoteIdx', required => false, trace => true},
|
||||
);
|
||||
|
||||
foreach my $rhProtocol (protocolList($strRemoteType, $iRemoteIdx))
|
||||
{
|
||||
$hProtocol->{$rhProtocol->{strRemoteType}}{$rhProtocol->{iRemoteIdx}}->keepAlive();
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
push @EXPORT, qw(protocolKeepAlive);
|
||||
|
||||
1;
|
@ -1,51 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL LOCAL MASTER MODULE
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Local::Master;
|
||||
use parent 'pgBackRest::Protocol::Command::Master';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Protocol::Command::Master;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strCommand,
|
||||
$iProcessIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strCommand'},
|
||||
{name => 'iProcessIdx', default => 1},
|
||||
);
|
||||
|
||||
# Init object and store variables
|
||||
my $self = $class->SUPER::new(
|
||||
'local', "local-${iProcessIdx} process", $strCommand, cfgOption(CFGOPT_BUFFER_SIZE),
|
||||
cfgOption(CFGOPT_COMPRESS_LEVEL), cfgOption(CFGOPT_COMPRESS_LEVEL_NETWORK), cfgOption(CFGOPT_PROTOCOL_TIMEOUT));
|
||||
|
||||
bless $self, $class;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,667 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL LOCAL PROCESS MODULE
|
||||
#
|
||||
# This module can be extended by commands that want to perform jobs in parallel.
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Local::Process;
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use IO::Select;
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Protocol::Local::Master;
|
||||
use pgBackRest::Version;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
|
||||
my $self = {};
|
||||
bless $self, $class;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
(
|
||||
my $strOperation,
|
||||
$self->{strHostType},
|
||||
$self->{iSelectTimeout},
|
||||
$self->{strBackRestBin},
|
||||
$self->{bConfessError},
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strHostType'},
|
||||
{name => 'iSelectTimeout', default => int(cfgOption(CFGOPT_PROTOCOL_TIMEOUT) / 2)},
|
||||
{name => 'strBackRestBin', default => projectBin()},
|
||||
{name => 'bConfessError', default => true},
|
||||
);
|
||||
|
||||
# Declare host map and array
|
||||
$self->{hHostMap} = {};
|
||||
$self->{hyHost} = undef;
|
||||
|
||||
# Reset module variables to get ready for queueing
|
||||
$self->reset();
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# reset
|
||||
#
|
||||
# Reset to initial state.
|
||||
####################################################################################################################################
|
||||
sub reset
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->reset');
|
||||
|
||||
# Select object is used to check for new results
|
||||
$self->{oSelect} = undef;
|
||||
|
||||
# Declare local process map and array
|
||||
$self->{hyLocalMap} = undef;
|
||||
$self->{hyLocal} = undef;
|
||||
|
||||
# Set the processing flag to false
|
||||
$self->{bProcessing} = false;
|
||||
|
||||
# Initialize job total to 0
|
||||
$self->{iQueued} = 0;
|
||||
|
||||
# Initialize running job total to 0
|
||||
$self->{iRunning} = 0;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# hostAdd
|
||||
#
|
||||
# Add a host where jobs can be executed.
|
||||
####################################################################################################################################
|
||||
sub hostAdd
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iHostConfigIdx,
|
||||
$iProcessMax,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->hostAdd', \@_,
|
||||
{name => 'iHostConfigIdx'},
|
||||
{name => 'iProcessMax'},
|
||||
);
|
||||
|
||||
my $iHostIdx = $self->{hHostMap}{$iHostConfigIdx};
|
||||
|
||||
if (!defined($iHostIdx))
|
||||
{
|
||||
$iHostIdx = defined($self->{hyHost}) ? @{$self->{hyHost}} : 0;
|
||||
$self->{hHostMap}{$iHostConfigIdx} = $iHostIdx;
|
||||
}
|
||||
|
||||
my $hHost =
|
||||
{
|
||||
iHostConfigIdx => $iHostConfigIdx,
|
||||
iProcessMax => $iProcessMax,
|
||||
};
|
||||
|
||||
push(@{$self->{hyHost}}, $hHost);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# hostConnect
|
||||
#
|
||||
# Connect local processes to the hosts.
|
||||
####################################################################################################################################
|
||||
sub hostConnect
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->hostConnect');
|
||||
|
||||
# Create a select object used to monitor output from minions
|
||||
$self->{oSelect} = IO::Select->new();
|
||||
|
||||
# Iterate hosts
|
||||
my $iHostIdx = 0;
|
||||
|
||||
foreach my $hHost (@{$self->{hyHost}})
|
||||
{
|
||||
# If there are no jobs in the queue for this host then no need to connect
|
||||
if (!defined($hHost->{hyQueue}))
|
||||
{
|
||||
logDebugMisc(
|
||||
$strOperation, "no jobs for host",
|
||||
{name => 'strHostType', value => $self->{strHostType}},
|
||||
{name => 'iHostConfigIdx', value => $hHost->{iHostConfigIdx}});
|
||||
next;
|
||||
}
|
||||
|
||||
for (my $iHostProcessIdx = 0; $iHostProcessIdx < $hHost->{iProcessMax}; $iHostProcessIdx++)
|
||||
{
|
||||
my $iLocalIdx = defined($self->{hyLocal}) ? @{$self->{hyLocal}} : 0;
|
||||
my $iProcessId = $iLocalIdx + 1;
|
||||
|
||||
logDebugMisc(
|
||||
$strOperation, 'start local process',
|
||||
{name => 'strHostType', value => $self->{strHostType}},
|
||||
{name => 'iHostProcessIdx', value => $iHostProcessIdx},
|
||||
{name => 'iHostConfigIdx', value => $hHost->{iHostConfigIdx}},
|
||||
{name => 'iHostIdx', value => $iHostIdx},
|
||||
{name => 'iProcessId', value => $iProcessId});
|
||||
|
||||
my $oLocal = new pgBackRest::Protocol::Local::Master
|
||||
(
|
||||
cfgCommandWrite(
|
||||
CFGCMD_LOCAL, true, $self->{strBackRestBin}, undef,
|
||||
{
|
||||
&CFGOPT_COMMAND => {value => cfgCommandName(cfgCommandGet())},
|
||||
&CFGOPT_PROCESS => {value => $iProcessId},
|
||||
&CFGOPT_TYPE => {value => $self->{strHostType}},
|
||||
&CFGOPT_HOST_ID => {value => $hHost->{iHostConfigIdx}},
|
||||
|
||||
# Only enable file logging on the local when requested
|
||||
&CFGOPT_LOG_LEVEL_FILE =>
|
||||
{value => cfgOption(CFGOPT_LOG_SUBPROCESS) ? cfgOption(CFGOPT_LOG_LEVEL_FILE) : lc(OFF)},
|
||||
&CFGOPT_LOG_LEVEL_STDERR => {},
|
||||
}),
|
||||
$iLocalIdx + 1
|
||||
);
|
||||
|
||||
my $hLocal =
|
||||
{
|
||||
iHostIdx => $iHostIdx,
|
||||
iProcessId => $iProcessId,
|
||||
iHostProcessIdx => $iHostProcessIdx,
|
||||
oLocal => $oLocal,
|
||||
hndIn => fileno($oLocal->io()->handleRead()),
|
||||
};
|
||||
|
||||
push(@{$self->{hyLocal}}, $hLocal);
|
||||
|
||||
$self->{hLocalMap}{$hLocal->{hndIn}} = $hLocal;
|
||||
$self->{oSelect}->add($hLocal->{hndIn});
|
||||
}
|
||||
|
||||
$iHostIdx++;
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'bResult', value => $iHostIdx > 0 ? true : false}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# init
|
||||
#
|
||||
# Initialize data structures required for processing.
|
||||
####################################################################################################################################
|
||||
sub init
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->init');
|
||||
|
||||
if ($self->hostConnect())
|
||||
{
|
||||
foreach my $hLocal (@{$self->{hyLocal}})
|
||||
{
|
||||
my $hHost = $self->{hyHost}[$hLocal->{iHostIdx}];
|
||||
my $hyQueue = $hHost->{hyQueue};
|
||||
|
||||
# Initialize variables to keep track of what job the local is working on
|
||||
$hLocal->{iDirection} = $hLocal->{iHostProcessIdx} % 2 == 0 ? 1 : -1;
|
||||
$hLocal->{iQueueIdx} = int((@{$hyQueue} / $hHost->{iProcessMax}) * $hLocal->{iHostProcessIdx});
|
||||
|
||||
# Calculate the last queue that this process should pull from
|
||||
$hLocal->{iQueueLastIdx} = $hLocal->{iQueueIdx} + ($hLocal->{iDirection} * -1);
|
||||
|
||||
if ($hLocal->{iQueueLastIdx} < 0)
|
||||
{
|
||||
$hLocal->{iQueueLastIdx} = @{$hyQueue} - 1;
|
||||
}
|
||||
elsif ($hLocal->{iQueueLastIdx} >= @{$hyQueue})
|
||||
{
|
||||
$hLocal->{iQueueLastIdx} = 0;
|
||||
}
|
||||
|
||||
logDebugMisc(
|
||||
$strOperation, 'init local process',
|
||||
{name => 'iHostIdx', value => $hLocal->{iHostIdx}},
|
||||
{name => 'iProcessId', value => $hLocal->{iProcessId}},
|
||||
{name => 'iDirection', value => $hLocal->{iDirection}},
|
||||
{name => 'iQueueIdx', value => $hLocal->{iQueueIdx}},
|
||||
{name => 'iQueueLastIdx', value => $hLocal->{iQueueLastIdx}});
|
||||
}
|
||||
|
||||
$self->{bProcessing} = true;
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'bResult', value => $self->processing()}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# process
|
||||
#
|
||||
# Run all jobs and return results in batches.
|
||||
####################################################################################################################################
|
||||
sub process
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->process');
|
||||
|
||||
# Initialize processing
|
||||
if (!$self->processing())
|
||||
{
|
||||
if (!$self->init())
|
||||
{
|
||||
logDebugMisc($strOperation, 'no jobs to run');
|
||||
$self->reset();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# If jobs are running then wait for any of them to complete
|
||||
my @hyResult = ();
|
||||
my $iCompleted = 0;
|
||||
|
||||
if ($self->{iRunning} > 0)
|
||||
{
|
||||
&logDebugMisc(
|
||||
$strOperation, 'check running jobs',
|
||||
{name => 'iRunning', value => $self->{iRunning}, trace => true});
|
||||
|
||||
# Wait for results to be available on any of the local process inputs
|
||||
my @hndyIn = $self->{oSelect}->can_read($self->{iSelectTimeout});
|
||||
|
||||
# Fetch results from the completed jobs
|
||||
foreach my $hndIn (@hndyIn)
|
||||
{
|
||||
# Get the local data
|
||||
my $hLocal = $self->{hLocalMap}{$hndIn};
|
||||
|
||||
if (!defined($hLocal))
|
||||
{
|
||||
confess &log(ASSERT, "unable to map from fileno ${hndIn} to local");
|
||||
}
|
||||
|
||||
# Get the job result
|
||||
my $hJob = $hLocal->{hJob};
|
||||
|
||||
eval
|
||||
{
|
||||
$hJob->{rResult} = $hLocal->{oLocal}->outputRead(true, undef, undef, true);
|
||||
|
||||
# Create a result array when the result is not already an array. The Perl locals always return an array but the C
|
||||
# locals only do so when needed.
|
||||
if (ref($hJob->{rResult}) ne 'ARRAY')
|
||||
{
|
||||
my @resultArray = (${$hJob->{rResult}});
|
||||
$hJob->{rResult} = \@resultArray;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
or do
|
||||
{
|
||||
my $oException = $EVAL_ERROR;
|
||||
|
||||
# If not a backrest exception then always confess it - something has gone very wrong
|
||||
confess $oException if (!isException(\$oException));
|
||||
|
||||
# If the process is has terminated throw the exception
|
||||
if (!defined($hLocal->{oLocal}->io()->processId()))
|
||||
{
|
||||
confess logException($oException);
|
||||
}
|
||||
|
||||
# If errors should be confessed then do so
|
||||
if ($self->{bConfessError})
|
||||
{
|
||||
confess logException($oException);
|
||||
}
|
||||
# Else store exception so caller can process it
|
||||
else
|
||||
{
|
||||
$hJob->{oException} = $oException;
|
||||
}
|
||||
};
|
||||
|
||||
$hJob->{iProcessId} = $hLocal->{iProcessId};
|
||||
push(@hyResult, $hJob);
|
||||
|
||||
logDebugMisc(
|
||||
$strOperation, 'job complete',
|
||||
{name => 'iProcessId', value => $hJob->{iProcessId}},
|
||||
{name => 'strKey', value => $hJob->{strKey}},
|
||||
{name => 'rResult', value => $hJob->{rResult}});
|
||||
|
||||
# Free the local process to receive another job
|
||||
$hLocal->{hJob} = undef;
|
||||
$self->{iRunning}--;
|
||||
$iCompleted++;
|
||||
}
|
||||
}
|
||||
|
||||
# If any jobs are not running/completed then assign new jobs
|
||||
if ($self->{iRunning} == 0 || $iCompleted > 0)
|
||||
{
|
||||
&logDebugMisc(
|
||||
$strOperation, 'get new jobs',
|
||||
{name => 'iRunning', value => $self->{iRunning}, trace => true},
|
||||
{name => 'iCompleted', value => $iCompleted, trace => true});
|
||||
|
||||
my $bFound = false;
|
||||
my $iLocalIdx = -1;
|
||||
|
||||
# Iterate all local processes
|
||||
foreach my $hLocal (@{$self->{hyLocal}})
|
||||
{
|
||||
# Skip this local process if it has already completed
|
||||
$iLocalIdx++;
|
||||
next if (!defined($hLocal));
|
||||
|
||||
my $hHost = $self->{hyHost}[$hLocal->{iHostIdx}];
|
||||
my $hyQueue = $hHost->{hyQueue};
|
||||
|
||||
# If this process does not currently have a job assigned then find one
|
||||
if (!defined($hLocal->{hJob}))
|
||||
{
|
||||
# Search queues for a new job
|
||||
my $iQueueIdx = $hLocal->{iQueueIdx};
|
||||
my $hJob = shift(@{$$hyQueue[$iQueueIdx]});
|
||||
|
||||
while (!defined($hJob) && $iQueueIdx != $hLocal->{iQueueLastIdx})
|
||||
{
|
||||
$iQueueIdx += $hLocal->{iDirection};
|
||||
|
||||
if ($iQueueIdx < 0)
|
||||
{
|
||||
$iQueueIdx = @{$hyQueue} - 1;
|
||||
}
|
||||
elsif ($iQueueIdx >= @{$hyQueue})
|
||||
{
|
||||
$iQueueIdx = 0;
|
||||
}
|
||||
|
||||
$hJob = shift(@{$$hyQueue[$iQueueIdx]});
|
||||
}
|
||||
|
||||
# If no job was found then stop the local process
|
||||
if (!defined($hJob))
|
||||
{
|
||||
logDebugMisc(
|
||||
$strOperation, 'no jobs found, stop local',
|
||||
{name => 'strHostType', value => $hLocal->{strHostType}},
|
||||
{name => 'iHostConfigIdx', value => $hLocal->{iHostConfigIdx}},
|
||||
{name => 'iHostIdx', value => $hLocal->{iHostIdx}},
|
||||
{name => 'iProcessId', value => $hLocal->{iProcessId}});
|
||||
|
||||
# Remove input handle from the select object
|
||||
my $iHandleTotal = $self->{oSelect}->count();
|
||||
|
||||
$self->{oSelect}->remove($hLocal->{hndIn});
|
||||
|
||||
if ($iHandleTotal - $self->{oSelect}->count() != 1)
|
||||
{
|
||||
confess &log(ASSERT,
|
||||
"iProcessId $hLocal->{iProcessId}, handle $hLocal->{hndIn} was not removed from select object");
|
||||
}
|
||||
|
||||
# Remove input handle from the map
|
||||
delete($self->{hLocalMap}{$hLocal->{hndIn}});
|
||||
|
||||
# Close the local process
|
||||
$hLocal->{oLocal}->close(true);
|
||||
|
||||
# Undefine local process so it is no longer checked for new jobs
|
||||
undef(${$self->{hyLocal}}[$iLocalIdx]);
|
||||
|
||||
# Skip to next local process
|
||||
next;
|
||||
}
|
||||
|
||||
# Assign job to local process
|
||||
$hLocal->{hJob} = $hJob;
|
||||
$bFound = true;
|
||||
$self->{iRunning}++;
|
||||
$self->{iQueued}--;
|
||||
|
||||
logDebugMisc(
|
||||
$strOperation, 'get job from queue',
|
||||
{name => 'iHostIdx', value => $hLocal->{iHostIdx}},
|
||||
{name => 'iProcessId', value => $hLocal->{iProcessId}},
|
||||
{name => 'strQueueIdx', value => $iQueueIdx},
|
||||
{name => 'strKey', value => $hLocal->{hJob}{strKey}});
|
||||
|
||||
# Send job to local process
|
||||
$hLocal->{oLocal}->cmdWrite($hLocal->{hJob}{strOp}, $hLocal->{hJob}->{rParam});
|
||||
}
|
||||
}
|
||||
|
||||
# If nothing is running, no more jobs, and nothing to return, then processing is complete
|
||||
if (!$bFound && !$self->{iRunning} && @hyResult == 0)
|
||||
{
|
||||
logDebugMisc($strOperation, 'all jobs complete');
|
||||
$self->reset();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
# Return job results
|
||||
return \@hyResult;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# queueJob
|
||||
#
|
||||
# Queue a job for processing.
|
||||
####################################################################################################################################
|
||||
sub queueJob
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iHostConfigIdx,
|
||||
$strQueue,
|
||||
$strKey,
|
||||
$strOp,
|
||||
$rParam,
|
||||
$rParamSecure,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->queueJob', \@_,
|
||||
{name => 'iHostConfigIdx'},
|
||||
{name => 'strQueue'},
|
||||
{name => 'strKey'},
|
||||
{name => 'strOp'},
|
||||
{name => 'rParam'},
|
||||
{name => 'rParamSecure', optional => true, redact => true},
|
||||
);
|
||||
|
||||
# Don't add jobs while in the middle of processing the current queue
|
||||
if ($self->processing())
|
||||
{
|
||||
confess &log(ASSERT, 'new jobs cannot be added until processing is complete');
|
||||
}
|
||||
|
||||
# Copy the parameters to a new variable so we can push the secure parameters on
|
||||
if (defined($rParamSecure))
|
||||
{
|
||||
push(@{$rParam}, @{$rParamSecure});
|
||||
}
|
||||
|
||||
# Build the hash
|
||||
my $hJob =
|
||||
{
|
||||
iHostConfigIdx => $iHostConfigIdx,
|
||||
strQueue => $strQueue,
|
||||
strKey => $strKey,
|
||||
strOp => $strOp,
|
||||
rParam => $rParam,
|
||||
};
|
||||
|
||||
# Get the host that will perform this job
|
||||
my $iHostIdx = $self->{hHostMap}{$iHostConfigIdx};
|
||||
|
||||
if (!defined($iHostIdx))
|
||||
{
|
||||
confess &log(ASSERT, "iHostConfigIdx = $iHostConfigIdx does not exist");
|
||||
}
|
||||
|
||||
my $hHost = $self->{hyHost}[$iHostIdx];
|
||||
|
||||
# Get the queue to hold this job
|
||||
my $iQueueIdx = $hHost->{hQueueMap}{$strQueue};
|
||||
|
||||
if (!defined($iQueueIdx))
|
||||
{
|
||||
$iQueueIdx = defined($hHost->{hyQueue}) ? @{$hHost->{hyQueue}} : 0;
|
||||
$hHost->{hQueueMap}{$strQueue} = $iQueueIdx;
|
||||
}
|
||||
|
||||
push(@{$hHost->{hyQueue}[$iQueueIdx]}, $hJob);
|
||||
$self->{iQueued}++;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# dequeueJobs
|
||||
#
|
||||
# Dequeue all jobs from a queue.
|
||||
####################################################################################################################################
|
||||
sub dequeueJobs
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iHostConfigIdx,
|
||||
$strQueue,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->dequeueJobs', \@_,
|
||||
{name => 'iHostConfigIdx'},
|
||||
{name => 'strQueue'},
|
||||
);
|
||||
|
||||
# Don't add jobs while in the middle of processing the current queue
|
||||
if (!$self->processing())
|
||||
{
|
||||
confess &log(ASSERT, 'unable to dequeue a job when not processing');
|
||||
}
|
||||
|
||||
# Get the host that contains the queue to clear
|
||||
my $iHostIdx = $self->{hHostMap}{$iHostConfigIdx};
|
||||
|
||||
if (!defined($iHostIdx))
|
||||
{
|
||||
confess &log(ASSERT, "iHostConfigIdx = $iHostConfigIdx does not exist");
|
||||
}
|
||||
|
||||
my $hHost = $self->{hyHost}[$iHostIdx];
|
||||
|
||||
# Get the queue to clear
|
||||
my $iQueueIdx = $hHost->{hQueueMap}{$strQueue};
|
||||
|
||||
if (!defined($iQueueIdx))
|
||||
{
|
||||
confess &log(ASSERT, "unable to find queue '${strQueue}'");
|
||||
}
|
||||
|
||||
$hHost->{hyQueue}[$iQueueIdx] = [];
|
||||
$self->{iQueued} = 0;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn($strOperation);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# jobTotal
|
||||
#
|
||||
# Total jobs in the queue.
|
||||
####################################################################################################################################
|
||||
sub jobTotal
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->jobTotal');
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'iJobTotal', value => $self->{iQueued} + $self->{iRunning}}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# processing
|
||||
#
|
||||
# Are jobs being processed?
|
||||
####################################################################################################################################
|
||||
sub processing
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->processing');
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'bProcessing', value => $self->{bProcessing}, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,76 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL REMOTE MASTER MODULE
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Remote::Master;
|
||||
use parent 'pgBackRest::Protocol::Command::Master';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
|
||||
use File::Basename qw(dirname);
|
||||
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Protocol::Command::Master;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strCommandSSH, # SSH client
|
||||
$strCommand, # Command to execute on local/remote
|
||||
$iBufferMax, # Maximum buffer size
|
||||
$iCompressLevel, # Set compression level
|
||||
$iCompressLevelNetwork, # Set compression level for network only compression
|
||||
$strHost, # Host to connect to for remote (optional as this can also be used for local)
|
||||
$strUser, # User to connect to for remote (must be set if strHost is set)
|
||||
$iSshPort, # Specified if other than default port is needed for ssh
|
||||
$iProtocolTimeout, # Protocol timeout
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'strCommandSSH'},
|
||||
{name => 'strCommand'},
|
||||
{name => 'iBufferMax'},
|
||||
{name => 'iCompressLevel'},
|
||||
{name => 'iCompressLevelNetwork'},
|
||||
{name => 'strHost'},
|
||||
{name => 'strUser'},
|
||||
{name => 'iSshPort', required => false},
|
||||
{name => 'iProtocolTimeout'},
|
||||
);
|
||||
|
||||
my $strCommandSshPort = defined($iSshPort) ? '-p ' . $iSshPort . ' ' : '';
|
||||
|
||||
# Create SSH command
|
||||
$strCommand =
|
||||
"${strCommandSSH} -o LogLevel=error -o Compression=no -o PasswordAuthentication=no $strCommandSshPort" .
|
||||
"${strUser}\@${strHost} '${strCommand}'";
|
||||
|
||||
# Init object and store variables
|
||||
my $self = $class->SUPER::new(
|
||||
'remote', "remote process on '$strHost'", $strCommand, $iBufferMax, $iCompressLevel, $iCompressLevelNetwork,
|
||||
$iProtocolTimeout);
|
||||
bless $self, $class;
|
||||
|
||||
# Store the host
|
||||
$self->{strHost} = $strHost;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,122 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# PROTOCOL REMOTE MINION MODULE
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Remote::Minion;
|
||||
use parent 'pgBackRest::Protocol::Command::Minion';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
|
||||
use File::Basename qw(dirname);
|
||||
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Common::Io::Buffered;
|
||||
use pgBackRest::Common::Wait;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Db;
|
||||
use pgBackRest::Protocol::Command::Minion;
|
||||
use pgBackRest::Protocol::Helper;
|
||||
use pgBackRest::Protocol::Storage::Helper;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift; # Class name
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iBufferMax, # Maximum buffer size
|
||||
$iProtocolTimeout # Protocol timeout
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'iBufferMax'},
|
||||
{name => 'iProtocolTimeout'}
|
||||
);
|
||||
|
||||
# Init object and store variables
|
||||
my $self = $class->SUPER::new(cfgCommandName(CFGCMD_REMOTE), $iBufferMax, $iProtocolTimeout);
|
||||
bless $self, $class;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# init
|
||||
####################################################################################################################################
|
||||
sub init
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my ($strOperation) = logDebugParam(__PACKAGE__ . '->init');
|
||||
|
||||
# Create objects
|
||||
my $oStorage = cfgOptionTest(CFGOPT_TYPE, CFGOPTVAL_REMOTE_TYPE_DB) ? storageDb() : storageRepo();
|
||||
|
||||
my $oDb = cfgOptionTest(CFGOPT_TYPE, CFGOPTVAL_REMOTE_TYPE_DB) ? new pgBackRest::Db() : undef;
|
||||
|
||||
# Create anonymous subs for each command
|
||||
my $hCommandMap =
|
||||
{
|
||||
# Db commands
|
||||
&OP_DB_CONNECT => sub {$oDb->connect()},
|
||||
&OP_DB_EXECUTE_SQL => sub {$oDb->executeSql(@{shift()})},
|
||||
&OP_DB_INFO => sub {$oDb->info(@{shift()})},
|
||||
|
||||
# File commands
|
||||
&OP_STORAGE_OPEN_READ => sub
|
||||
{
|
||||
my $oSourceFileIo = $oStorage->openRead(@{shift()});
|
||||
|
||||
# If the source file exists
|
||||
if (defined($oSourceFileIo) && (!defined($oSourceFileIo->{oStorageCRead}) || $oSourceFileIo->open()))
|
||||
{
|
||||
$self->outputWrite(true);
|
||||
|
||||
$oStorage->copy(
|
||||
$oSourceFileIo, new pgBackRest::Protocol::Storage::File($self, $oSourceFileIo), {bSourceOpen => true});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
},
|
||||
&OP_STORAGE_OPEN_WRITE => sub
|
||||
{
|
||||
my $oDestinationFileIo = $oStorage->openWrite(@{shift()});
|
||||
$oStorage->copy(new pgBackRest::Protocol::Storage::File($self, $oDestinationFileIo), $oDestinationFileIo);
|
||||
},
|
||||
|
||||
&OP_STORAGE_CIPHER_PASS_USER => sub {$oStorage->cipherPassUser()},
|
||||
&OP_STORAGE_EXISTS => sub {$oStorage->exists(@{shift()})},
|
||||
&OP_STORAGE_LIST => sub {$oStorage->list(@{shift()})},
|
||||
&OP_STORAGE_MANIFEST => sub {$oStorage->manifestJson(@{shift()})},
|
||||
&OP_STORAGE_MOVE => sub {$oStorage->move(@{shift()})},
|
||||
&OP_STORAGE_PATH_GET => sub {$oStorage->pathGet(@{shift()})},
|
||||
&OP_STORAGE_HASH_SIZE => sub {$oStorage->hashSize(@{shift()})},
|
||||
|
||||
# Wait command
|
||||
&OP_WAIT => sub {waitRemainder(@{shift()})},
|
||||
};
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'hCommandMap', value => $hCommandMap}
|
||||
);
|
||||
}
|
||||
|
||||
1;
|
@ -1,180 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# Protocol File
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Storage::File;
|
||||
use parent 'pgBackRest::Common::Io::Base';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use Exporter qw(import);
|
||||
our @EXPORT = qw();
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Log;
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$oProtocol, # Master or minion protocol
|
||||
$oFileIo, # File whose results will be returned via protocol
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'oProtocol', trace => true},
|
||||
{name => 'oFileIo', required => false, trace => true},
|
||||
);
|
||||
|
||||
# Create class
|
||||
my $self = $class->SUPER::new($oProtocol->io()->id() . ' file');
|
||||
bless $self, $class;
|
||||
|
||||
# Set variables
|
||||
$self->{oProtocol} = $oProtocol;
|
||||
$self->{oFileIo} = $oFileIo;
|
||||
|
||||
# Set read/write
|
||||
$self->{bWrite} = false;
|
||||
|
||||
# Initialize EOF to false
|
||||
$self->eofSet(false);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# eof - have reads reached eof?
|
||||
####################################################################################################################################
|
||||
sub eof
|
||||
{
|
||||
return shift->{bEOF};
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# eofSet - set eof
|
||||
####################################################################################################################################
|
||||
sub eofSet
|
||||
{
|
||||
my $self = shift;
|
||||
my $bEOF = shift;
|
||||
|
||||
$self->{bEOF} = $bEOF;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# read - read block from protocol
|
||||
####################################################################################################################################
|
||||
sub read
|
||||
{
|
||||
my $self = shift;
|
||||
my $rtBuffer = shift;
|
||||
|
||||
# After EOF always return 0
|
||||
return 0 if $self->eof();
|
||||
|
||||
my $lBlockSize;
|
||||
|
||||
# Read the block header and make sure it's valid
|
||||
my $strBlockHeader = $self->{oProtocol}->io()->readLine();
|
||||
|
||||
if ($strBlockHeader !~ /^BRBLOCK[0-9]+$/)
|
||||
{
|
||||
confess &log(ERROR, "invalid block header '${strBlockHeader}'", ERROR_FILE_READ);
|
||||
}
|
||||
|
||||
# Get block size from the header
|
||||
$lBlockSize = substr($strBlockHeader, 7);
|
||||
|
||||
# Read block if size > 0
|
||||
if ($lBlockSize > 0)
|
||||
{
|
||||
$self->{oProtocol}->io()->read($rtBuffer, $lBlockSize, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
$self->eofSet(true);
|
||||
}
|
||||
|
||||
# Return the block size
|
||||
return $lBlockSize;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# write - write block to protocol
|
||||
####################################################################################################################################
|
||||
sub write
|
||||
{
|
||||
my $self = shift;
|
||||
my $rtBuffer = shift;
|
||||
|
||||
# Set write
|
||||
$self->{bWrite} = true;
|
||||
|
||||
# Get the buffer size
|
||||
my $lBlockSize = defined($rtBuffer) ? length($$rtBuffer) : 0;
|
||||
|
||||
# Write if size > 0 (0 ends the copy stream so it should only be done in close())
|
||||
if ($lBlockSize > 0)
|
||||
{
|
||||
# Write block header to the protocol stream
|
||||
$self->{oProtocol}->io()->writeLine("BRBLOCK${lBlockSize}");
|
||||
|
||||
# Write block if size
|
||||
$self->{oProtocol}->io()->write($rtBuffer);
|
||||
}
|
||||
|
||||
return length($$rtBuffer);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# close - set the result hash
|
||||
####################################################################################################################################
|
||||
sub close
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Close if protocol is defined (to prevent this from running more than once)
|
||||
if (defined($self->{oProtocol}))
|
||||
{
|
||||
# If writing output terminator
|
||||
if ($self->{bWrite})
|
||||
{
|
||||
$self->{oProtocol}->io()->writeLine("BRBLOCK0");
|
||||
}
|
||||
|
||||
# On master read the results
|
||||
if ($self->{oProtocol}->master())
|
||||
{
|
||||
($self->{rhResult}) = $self->{oProtocol}->outputRead();
|
||||
|
||||
# Minion will send one more output message after file is closed which can be ignored
|
||||
$self->{oProtocol}->outputRead();
|
||||
}
|
||||
# On minion write the results
|
||||
else
|
||||
{
|
||||
$self->{oProtocol}->outputWrite($self->{oFileIo}->resultAll());
|
||||
}
|
||||
|
||||
# Delete protocol to prevent close from running again
|
||||
delete($self->{oProtocol});
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
@ -14,8 +14,6 @@ use File::Basename qw(basename);
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::LibC qw(:storage);
|
||||
use pgBackRest::Protocol::Helper;
|
||||
use pgBackRest::Protocol::Storage::Remote;
|
||||
use pgBackRest::Storage::Helper;
|
||||
|
||||
####################################################################################################################################
|
||||
@ -45,35 +43,24 @@ sub storageDb
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$iRemoteIdx,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '::storageDb', \@_,
|
||||
{name => 'iRemoteIdx', optional => true, default => cfgOptionValid(CFGOPT_HOST_ID) ? cfgOption(CFGOPT_HOST_ID) : 1,
|
||||
trace => true},
|
||||
);
|
||||
|
||||
# Create storage if not defined
|
||||
if (!defined($hStorage->{&STORAGE_DB}{$iRemoteIdx}))
|
||||
if (!defined($hStorage->{&STORAGE_DB}))
|
||||
{
|
||||
if (isDbLocal({iRemoteIdx => $iRemoteIdx}))
|
||||
{
|
||||
$hStorage->{&STORAGE_DB}{$iRemoteIdx} = new pgBackRest::Storage::Storage(
|
||||
STORAGE_DB, {lBufferMax => cfgOption(CFGOPT_BUFFER_SIZE)});
|
||||
}
|
||||
else
|
||||
{
|
||||
$hStorage->{&STORAGE_DB}{$iRemoteIdx} = new pgBackRest::Protocol::Storage::Remote(
|
||||
protocolGet(CFGOPTVAL_REMOTE_TYPE_DB, $iRemoteIdx));
|
||||
}
|
||||
$hStorage->{&STORAGE_DB} = new pgBackRest::Storage::Storage(
|
||||
STORAGE_DB, {lBufferMax => cfgOption(CFGOPT_BUFFER_SIZE)});
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'oStorageDb', value => $hStorage->{&STORAGE_DB}{$iRemoteIdx}, trace => true},
|
||||
{name => 'oStorageDb', value => $hStorage->{&STORAGE_DB}, trace => true},
|
||||
);
|
||||
}
|
||||
|
||||
@ -99,17 +86,8 @@ sub storageRepo
|
||||
# Create storage if not defined
|
||||
if (!defined($hStorage->{&STORAGE_REPO}))
|
||||
{
|
||||
if (isRepoLocal())
|
||||
{
|
||||
$hStorage->{&STORAGE_REPO} = new pgBackRest::Storage::Storage(
|
||||
STORAGE_REPO, {lBufferMax => cfgOption(CFGOPT_BUFFER_SIZE)});
|
||||
}
|
||||
else
|
||||
{
|
||||
# Create remote storage
|
||||
$hStorage->{&STORAGE_REPO} = new pgBackRest::Protocol::Storage::Remote(
|
||||
protocolGet(CFGOPTVAL_REMOTE_TYPE_BACKUP));
|
||||
}
|
||||
$hStorage->{&STORAGE_REPO} = new pgBackRest::Storage::Storage(
|
||||
STORAGE_REPO, {lBufferMax => cfgOption(CFGOPT_BUFFER_SIZE)});
|
||||
}
|
||||
|
||||
# Return from function and log return values if any
|
||||
|
@ -1,342 +0,0 @@
|
||||
####################################################################################################################################
|
||||
# Remote Storage
|
||||
####################################################################################################################################
|
||||
package pgBackRest::Protocol::Storage::Remote;
|
||||
use parent 'pgBackRest::Storage::Base';
|
||||
|
||||
use strict;
|
||||
use warnings FATAL => qw(all);
|
||||
use Carp qw(confess);
|
||||
use English '-no_match_vars';
|
||||
|
||||
use JSON::PP;
|
||||
|
||||
use pgBackRest::Common::Exception;
|
||||
use pgBackRest::Common::Log;
|
||||
use pgBackRest::Config::Config;
|
||||
use pgBackRest::Protocol::Helper;
|
||||
use pgBackRest::Protocol::Storage::File;
|
||||
use pgBackRest::Storage::Base;
|
||||
|
||||
####################################################################################################################################
|
||||
# new
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$oProtocol,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->new', \@_,
|
||||
{name => 'oProtocol'},
|
||||
);
|
||||
|
||||
# Init parent
|
||||
my $self = $class->SUPER::new({lBufferMax => $oProtocol->io()->bufferMax()});
|
||||
bless $self, $class;
|
||||
|
||||
# Set variables
|
||||
$self->{oProtocol} = $oProtocol;
|
||||
|
||||
# Create JSON object
|
||||
$self->{oJSON} = JSON::PP->new()->allow_nonref();
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'self', value => $self}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# exists
|
||||
####################################################################################################################################
|
||||
sub exists
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strPathExp,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->exists', \@_,
|
||||
{name => 'strPathExp'},
|
||||
);
|
||||
|
||||
my $bExists = $self->{oProtocol}->cmdExecute(OP_STORAGE_EXISTS, [$strPathExp]);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'bExists', value => $bExists}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# hashSize
|
||||
####################################################################################################################################
|
||||
sub hashSize
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strPathExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->hashSize', \@_,
|
||||
{name => 'strPathExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
my ($strHash, $lSize) = $self->{oProtocol}->cmdExecute(OP_STORAGE_HASH_SIZE, [$strPathExp, $rhParam]);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'strHash', value => $strHash},
|
||||
{name => 'lSize', value => $lSize}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# list
|
||||
####################################################################################################################################
|
||||
sub list
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strPathExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->list', \@_,
|
||||
{name => 'strPathExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
my @stryFileList = $self->{oProtocol}->cmdExecute(OP_STORAGE_LIST, [$strPathExp, $rhParam]);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'stryFileList', value => \@stryFileList}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# manifest
|
||||
####################################################################################################################################
|
||||
sub manifest
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strPathExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->manifest', \@_,
|
||||
{name => 'strPathExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
my $hManifest = $self->{oJSON}->decode($self->{oProtocol}->cmdExecute(OP_STORAGE_MANIFEST, [$strPathExp, $rhParam]));
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'hManifest', value => $hManifest, trace => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# openRead
|
||||
####################################################################################################################################
|
||||
sub openRead
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strFileExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->openRead', \@_,
|
||||
{name => 'strFileExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
my $oSourceFileIo =
|
||||
$self->{oProtocol}->cmdExecute(OP_STORAGE_OPEN_READ, [$strFileExp, $rhParam]) ?
|
||||
new pgBackRest::Protocol::Storage::File($self->{oProtocol}) : undef;
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'oFileIo', value => $oSourceFileIo, trace => true},
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# openWrite
|
||||
####################################################################################################################################
|
||||
sub openWrite
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strFileExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->openWrite', \@_,
|
||||
{name => 'strFileExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
# Open the remote file
|
||||
$self->{oProtocol}->cmdWrite(OP_STORAGE_OPEN_WRITE, [$strFileExp, $rhParam]);
|
||||
my $oDestinationFileIo = new pgBackRest::Protocol::Storage::File($self->{oProtocol});
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'oFileIo', value => $oDestinationFileIo, trace => true},
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# pathGet
|
||||
####################################################################################################################################
|
||||
sub pathGet
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strPathExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->pathGet', \@_,
|
||||
{name => 'strPathExp', required => false},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
my $strPath = $self->{oProtocol}->cmdExecute(OP_STORAGE_PATH_GET, [$strPathExp, $rhParam]);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'strPath', value => $strPath}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# move
|
||||
####################################################################################################################################
|
||||
sub move
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
$strSourcePathExp,
|
||||
$strDestinationPathExp,
|
||||
$rhParam,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->move', \@_,
|
||||
{name => 'strSourcePathExp'},
|
||||
{name => 'strDestinationPathExp'},
|
||||
{name => 'rhParam', required => false},
|
||||
);
|
||||
|
||||
$self->{oProtocol}->cmdExecute(OP_STORAGE_MOVE, [$strSourcePathExp, $strDestinationPathExp, $rhParam], false);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# cipherPassUser
|
||||
####################################################################################################################################
|
||||
sub cipherPassUser
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Assign function parameters, defaults, and log debug info
|
||||
my
|
||||
(
|
||||
$strOperation,
|
||||
) =
|
||||
logDebugParam
|
||||
(
|
||||
__PACKAGE__ . '->cipherPassUser', \@_,
|
||||
);
|
||||
|
||||
my $strCipherPassUser = $self->{oProtocol}->cmdExecute(OP_STORAGE_CIPHER_PASS_USER);
|
||||
|
||||
# Return from function and log return values if any
|
||||
return logDebugReturn
|
||||
(
|
||||
$strOperation,
|
||||
{name => 'strCipherPassUser', value => $strCipherPassUser, redact => true}
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# getters
|
||||
####################################################################################################################################
|
||||
sub protocol {shift->{oProtocol}};
|
||||
|
||||
1;
|
Reference in New Issue
Block a user