mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-01-18 04:58:51 +02:00
File->hash now works on compressed files.
This commit is contained in:
parent
991afe3b16
commit
4cbc6c650a
@ -24,6 +24,7 @@ use lib dirname($0) . '/../lib';
|
||||
use BackRest::Exception;
|
||||
use BackRest::Utility;
|
||||
use BackRest::Remote;
|
||||
use BackRest::ProcessAsync;
|
||||
|
||||
# Exports
|
||||
use Exporter qw(import);
|
||||
@ -161,6 +162,11 @@ sub DEMOLISH
|
||||
{
|
||||
$self->{oRemote} = undef;
|
||||
}
|
||||
|
||||
if (defined($self->{oProcessAsync}))
|
||||
{
|
||||
$self->{oProcessAsync} = undef;
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
@ -182,6 +188,21 @@ sub clone
|
||||
);
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# PROCESS_ASYNC_GET
|
||||
####################################################################################################################################
|
||||
sub process_async_get
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if (!defined($self->{oProcessAsync}))
|
||||
{
|
||||
$self->{oProcessAsync} = new BackRest::ProcessAsync;
|
||||
}
|
||||
|
||||
return $self->{oProcessAsync};
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# PATH_TYPE_GET
|
||||
####################################################################################################################################
|
||||
@ -743,11 +764,12 @@ sub hash
|
||||
my $self = shift;
|
||||
my $strPathType = shift;
|
||||
my $strFile = shift;
|
||||
my $bDecompress = shift;
|
||||
my $bCompressed = shift;
|
||||
my $strHashType = shift;
|
||||
|
||||
# Set defaults
|
||||
$bDecompress = defined($bDecompress) ? $bDecompress : false;
|
||||
$bCompressed = defined($bCompressed) ? $bCompressed : false;
|
||||
$strHashType = defined($strHashType) ? $strHashType : 'sha1';
|
||||
|
||||
# Set operation variables
|
||||
my $strFileOp = $self->path_get($strPathType, $strFile);
|
||||
@ -755,7 +777,9 @@ sub hash
|
||||
|
||||
# Set operation and debug strings
|
||||
my $strOperation = OP_FILE_HASH;
|
||||
my $strDebug = "${strPathType}:${strFileOp}";
|
||||
my $strDebug = "${strPathType}:${strFileOp}, " .
|
||||
'compressed = ' . ($bCompressed ? 'true' : 'false') . ', ' .
|
||||
"hash_type = ${strHashType}";
|
||||
&log(DEBUG, "${strOperation}: ${strDebug}");
|
||||
|
||||
if ($self->is_remote($strPathType))
|
||||
@ -785,45 +809,12 @@ sub hash
|
||||
confess &log(ERROR, "${strDebug}: " . $strError);
|
||||
}
|
||||
|
||||
my $oSHA = Digest::SHA->new(defined($strHashType) ? $strHashType : 'sha1');
|
||||
my $oSHA = Digest::SHA->new($strHashType);
|
||||
|
||||
if ($bDecompress)
|
||||
if ($bCompressed)
|
||||
{
|
||||
my $hPipeOut;
|
||||
my $hPipeIn;
|
||||
|
||||
# Open the in/out pipes
|
||||
pipe $hPipeOut, $hPipeIn;
|
||||
|
||||
# Queue the compression job with the thread
|
||||
$self->{oRemote}->{oThreadQueue}->enqueue('compress:' . fileno($hFile) . ',' . fileno($hPipeIn));
|
||||
|
||||
# Wait for the thread to acknowledge that it has duplicated the file handles
|
||||
my $strMessage = $self->{oThreadResult}->dequeue();
|
||||
|
||||
# Close input pipe so that thread has the only copy
|
||||
if ($strMessage eq 'running')
|
||||
{
|
||||
close($hPipeIn);
|
||||
}
|
||||
# If any other message is returned then error
|
||||
else
|
||||
{
|
||||
confess "unknown thread message while waiting for running: ${strMessage}";
|
||||
}
|
||||
|
||||
$oSHA->addfile($hPipeOut);
|
||||
|
||||
# Make sure the de/compress pipes are closed
|
||||
close($hPipeOut);
|
||||
|
||||
# Wait for the thread to acknowledge that it has completed
|
||||
$strMessage = $self->{oThreadResult}->dequeue();
|
||||
|
||||
if ($strMessage ne 'complete')
|
||||
{
|
||||
confess "unknown thread message while waiting for complete: ${strMessage}";
|
||||
}
|
||||
$oSHA->addfile($self->process_async_get()->process_begin('decompress', $hFile));
|
||||
$self->process_async_get()->process_end();
|
||||
}
|
||||
else
|
||||
{
|
||||
|
188
lib/BackRest/ProcessAsync.pm
Normal file
188
lib/BackRest/ProcessAsync.pm
Normal file
@ -0,0 +1,188 @@
|
||||
####################################################################################################################################
|
||||
# PROCESS ASYNC MODULE
|
||||
####################################################################################################################################
|
||||
package BackRest::ProcessAsync;
|
||||
|
||||
use threads;
|
||||
use strict;
|
||||
use warnings;
|
||||
use Carp;
|
||||
|
||||
use Thread::Queue;
|
||||
use File::Basename;
|
||||
use IO::Handle;
|
||||
use IO::Compress::Gzip qw(gzip $GzipError);
|
||||
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
|
||||
|
||||
use lib dirname($0) . '/../lib';
|
||||
use BackRest::Utility;
|
||||
|
||||
#use Exporter qw(import);
|
||||
#our @EXPORT = qw(new DESTROY thread_kill process_begin process_end);
|
||||
|
||||
####################################################################################################################################
|
||||
# CONSTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my $self = {};
|
||||
|
||||
$self->{oThreadQueue} = Thread::Queue->new();
|
||||
$self->{oThreadResult} = Thread::Queue->new();
|
||||
$self->{oThread} = threads->create(\&process_thread, $self);
|
||||
|
||||
bless $self, $class;
|
||||
return $self;
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# thread_kill
|
||||
####################################################################################################################################
|
||||
sub thread_kill
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
if (defined($self->{oThread}))
|
||||
{
|
||||
$self->{oThreadQueue}->enqueue(undef);
|
||||
$self->{oThread}->join();
|
||||
$self->{oThread} = undef;
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# DESTRUCTOR
|
||||
####################################################################################################################################
|
||||
sub DESTROY
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
$self->thread_kill();
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# process_thread
|
||||
#
|
||||
# De/Compresses/Checksum data on a thread.
|
||||
####################################################################################################################################
|
||||
sub process_thread
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
while (my $strMessage = $self->{oThreadQueue}->dequeue())
|
||||
{
|
||||
my @stryMessage = split(':', $strMessage);
|
||||
my @strHandle = split(',', $stryMessage[1]);
|
||||
|
||||
my $hIn = IO::Handle->new_from_fd($strHandle[0], '<');
|
||||
my $hOut = IO::Handle->new_from_fd($strHandle[1], '>');
|
||||
|
||||
$self->{oThreadResult}->enqueue('running');
|
||||
|
||||
if ($stryMessage[0] eq 'compress')
|
||||
{
|
||||
gzip($hIn => $hOut)
|
||||
or confess &log(ERROR, 'unable to compress: ' . $GzipError);
|
||||
}
|
||||
elsif ($stryMessage[0] eq 'decompress')
|
||||
{
|
||||
gunzip($hIn => $hOut)
|
||||
or confess &log(ERROR, 'unable to decompress: ' . $GunzipError);
|
||||
}
|
||||
|
||||
close($hOut);
|
||||
|
||||
$self->{oThreadResult}->enqueue('complete');
|
||||
}
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# process_begin
|
||||
#
|
||||
# Begins the de/compress/checksum operation
|
||||
####################################################################################################################################
|
||||
sub process_begin
|
||||
{
|
||||
my $self = shift;
|
||||
my $strProcess = shift;
|
||||
my $hIn = shift;
|
||||
|
||||
# Check if thread is already running
|
||||
if (defined($self->{hPipeOut}))
|
||||
{
|
||||
confess &log(ASSERT, 'thread is already running in ProcessAsync->process_begin()');
|
||||
}
|
||||
|
||||
# Validate strProcess
|
||||
if (!defined($strProcess))
|
||||
{
|
||||
confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()');
|
||||
}
|
||||
|
||||
if ($strProcess ne 'compress' && $strProcess ne 'decompress' && $strProcess ne 'checksum')
|
||||
{
|
||||
confess &log(ASSERT, 'strProcess must be (compress, decompress, checksum) in call to ProcessAsync->process_begin():' .
|
||||
" ${strProcess} was passed");
|
||||
}
|
||||
|
||||
# Validate hIn
|
||||
if (!defined($hIn))
|
||||
{
|
||||
confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()');
|
||||
}
|
||||
|
||||
# Open the in/out pipes
|
||||
my $hPipeIn;
|
||||
|
||||
pipe $self->{hPipeOut}, $hPipeIn;
|
||||
|
||||
# Queue the queue job with the thread
|
||||
$self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hIn) . ',' . fileno($hPipeIn));
|
||||
|
||||
# Wait for the thread to acknowledge that it has duplicated the file handles
|
||||
my $strMessage = $self->{oThreadResult}->dequeue();
|
||||
|
||||
# Close input pipe so that thread has the only copy
|
||||
if ($strMessage eq 'running')
|
||||
{
|
||||
close($hPipeIn);
|
||||
}
|
||||
# If any other message is returned then error
|
||||
else
|
||||
{
|
||||
confess "unknown thread message while waiting for running: ${strMessage}";
|
||||
}
|
||||
|
||||
return $self->{hPipeOut};
|
||||
}
|
||||
|
||||
####################################################################################################################################
|
||||
# process_end
|
||||
#
|
||||
# Ends the de/compress/checksum operation
|
||||
####################################################################################################################################
|
||||
sub process_end
|
||||
{
|
||||
my $self = shift;
|
||||
|
||||
# Check if thread is not running
|
||||
if (!defined($self->{hPipeOut}))
|
||||
{
|
||||
confess &log(ASSERT, 'thread is not running in ProcessAsync->process_end()');
|
||||
}
|
||||
|
||||
# Make sure the de/compress pipes are closed
|
||||
close($self->{hPipeOut});
|
||||
$self->{hPipeOut} = undef;
|
||||
|
||||
# Wait for the thread to acknowledge that it has completed
|
||||
my $strMessage = $self->{oThreadResult}->dequeue();
|
||||
|
||||
if ($strMessage ne 'complete')
|
||||
{
|
||||
confess "unknown thread message while waiting for complete: ${strMessage}";
|
||||
}
|
||||
}
|
||||
|
||||
true;
|
@ -788,7 +788,7 @@ sub BackRestTestFile_Test
|
||||
&log(INFO, '--------------------------------------------------------------------------------');
|
||||
&log(INFO, "test File->hash()\n");
|
||||
|
||||
for (my $bRemote = 0; $bRemote <= 1; $bRemote++)
|
||||
for (my $bRemote = false; $bRemote <= true; $bRemote++)
|
||||
{
|
||||
my $oFile = BackRest::File->new
|
||||
(
|
||||
@ -799,13 +799,13 @@ sub BackRestTestFile_Test
|
||||
);
|
||||
|
||||
# Loop through error
|
||||
for (my $bError = 0; $bError <= 1; $bError++)
|
||||
for (my $bError = false; $bError <= true; $bError++)
|
||||
{
|
||||
# Loop through exists
|
||||
for (my $bExists = 0; $bExists <= 1; $bExists++)
|
||||
for (my $bExists = false; $bExists <= true; $bExists++)
|
||||
{
|
||||
# Loop through exists
|
||||
for (my $bCompressed = 0; $bCompressed <= 1; $bCompressed++)
|
||||
for (my $bCompressed = false; $bCompressed <= true; $bCompressed++)
|
||||
{
|
||||
if (!BackRestTestCommon_Run(++$iRun,
|
||||
"rmt ${bRemote}, err ${bError}, exists ${bExists}, cmp ${bCompressed}")) {next}
|
||||
@ -826,6 +826,12 @@ sub BackRestTestFile_Test
|
||||
else
|
||||
{
|
||||
system("echo 'TESTDATA' > ${strFile}");
|
||||
|
||||
if ($bCompressed && !$bRemote)
|
||||
{
|
||||
$oFile->compress(PATH_BACKUP_ABSOLUTE, $strFile);
|
||||
$strFile = $strFile . '.gz';
|
||||
}
|
||||
}
|
||||
|
||||
# Execute in eval in case of error
|
||||
|
Loading…
x
Reference in New Issue
Block a user