From 4cbc6c650a6c1fe3924e91e3fb5b53181a9dfcd9 Mon Sep 17 00:00:00 2001 From: David Steele Date: Thu, 2 Oct 2014 13:54:26 -0400 Subject: [PATCH] File->hash now works on compressed files. --- lib/BackRest/File.pm | 71 +++++------ lib/BackRest/ProcessAsync.pm | 188 ++++++++++++++++++++++++++++++ test/lib/BackRestTest/FileTest.pm | 14 ++- 3 files changed, 229 insertions(+), 44 deletions(-) create mode 100644 lib/BackRest/ProcessAsync.pm diff --git a/lib/BackRest/File.pm b/lib/BackRest/File.pm index b450f03fa..eb61c24cd 100644 --- a/lib/BackRest/File.pm +++ b/lib/BackRest/File.pm @@ -24,6 +24,7 @@ use lib dirname($0) . '/../lib'; use BackRest::Exception; use BackRest::Utility; use BackRest::Remote; +use BackRest::ProcessAsync; # Exports use Exporter qw(import); @@ -161,6 +162,11 @@ sub DEMOLISH { $self->{oRemote} = undef; } + + if (defined($self->{oProcessAsync})) + { + $self->{oProcessAsync} = undef; + } } #################################################################################################################################### @@ -182,6 +188,21 @@ sub clone ); } +#################################################################################################################################### +# PROCESS_ASYNC_GET +#################################################################################################################################### +sub process_async_get +{ + my $self = shift; + + if (!defined($self->{oProcessAsync})) + { + $self->{oProcessAsync} = new BackRest::ProcessAsync; + } + + return $self->{oProcessAsync}; +} + #################################################################################################################################### # PATH_TYPE_GET #################################################################################################################################### @@ -743,11 +764,12 @@ sub hash my $self = shift; my $strPathType = shift; my $strFile = shift; - my $bDecompress = shift; + my $bCompressed = shift; my $strHashType = shift; # Set defaults - $bDecompress = defined($bDecompress) ? $bDecompress : false; + $bCompressed = defined($bCompressed) ? $bCompressed : false; + $strHashType = defined($strHashType) ? $strHashType : 'sha1'; # Set operation variables my $strFileOp = $self->path_get($strPathType, $strFile); @@ -755,7 +777,9 @@ sub hash # Set operation and debug strings my $strOperation = OP_FILE_HASH; - my $strDebug = "${strPathType}:${strFileOp}"; + my $strDebug = "${strPathType}:${strFileOp}, " . + 'compressed = ' . ($bCompressed ? 'true' : 'false') . ', ' . + "hash_type = ${strHashType}"; &log(DEBUG, "${strOperation}: ${strDebug}"); if ($self->is_remote($strPathType)) @@ -785,45 +809,12 @@ sub hash confess &log(ERROR, "${strDebug}: " . $strError); } - my $oSHA = Digest::SHA->new(defined($strHashType) ? $strHashType : 'sha1'); + my $oSHA = Digest::SHA->new($strHashType); - if ($bDecompress) + if ($bCompressed) { - my $hPipeOut; - my $hPipeIn; - - # Open the in/out pipes - pipe $hPipeOut, $hPipeIn; - - # Queue the compression job with the thread - $self->{oRemote}->{oThreadQueue}->enqueue('compress:' . fileno($hFile) . ',' . fileno($hPipeIn)); - - # Wait for the thread to acknowledge that it has duplicated the file handles - my $strMessage = $self->{oThreadResult}->dequeue(); - - # Close input pipe so that thread has the only copy - if ($strMessage eq 'running') - { - close($hPipeIn); - } - # If any other message is returned then error - else - { - confess "unknown thread message while waiting for running: ${strMessage}"; - } - - $oSHA->addfile($hPipeOut); - - # Make sure the de/compress pipes are closed - close($hPipeOut); - - # Wait for the thread to acknowledge that it has completed - $strMessage = $self->{oThreadResult}->dequeue(); - - if ($strMessage ne 'complete') - { - confess "unknown thread message while waiting for complete: ${strMessage}"; - } + $oSHA->addfile($self->process_async_get()->process_begin('decompress', $hFile)); + $self->process_async_get()->process_end(); } else { diff --git a/lib/BackRest/ProcessAsync.pm b/lib/BackRest/ProcessAsync.pm new file mode 100644 index 000000000..a83eaca46 --- /dev/null +++ b/lib/BackRest/ProcessAsync.pm @@ -0,0 +1,188 @@ +#################################################################################################################################### +# PROCESS ASYNC MODULE +#################################################################################################################################### +package BackRest::ProcessAsync; + +use threads; +use strict; +use warnings; +use Carp; + +use Thread::Queue; +use File::Basename; +use IO::Handle; +use IO::Compress::Gzip qw(gzip $GzipError); +use IO::Uncompress::Gunzip qw(gunzip $GunzipError); + +use lib dirname($0) . '/../lib'; +use BackRest::Utility; + +#use Exporter qw(import); +#our @EXPORT = qw(new DESTROY thread_kill process_begin process_end); + +#################################################################################################################################### +# CONSTRUCTOR +#################################################################################################################################### +sub new +{ + my $class = shift; + my $self = {}; + + $self->{oThreadQueue} = Thread::Queue->new(); + $self->{oThreadResult} = Thread::Queue->new(); + $self->{oThread} = threads->create(\&process_thread, $self); + + bless $self, $class; + return $self; +} + +#################################################################################################################################### +# thread_kill +#################################################################################################################################### +sub thread_kill +{ + my $self = shift; + + if (defined($self->{oThread})) + { + $self->{oThreadQueue}->enqueue(undef); + $self->{oThread}->join(); + $self->{oThread} = undef; + } +} + +#################################################################################################################################### +# DESTRUCTOR +#################################################################################################################################### +sub DESTROY +{ + my $self = shift; + + $self->thread_kill(); +} + +#################################################################################################################################### +# process_thread +# +# De/Compresses/Checksum data on a thread. +#################################################################################################################################### +sub process_thread +{ + my $self = shift; + + while (my $strMessage = $self->{oThreadQueue}->dequeue()) + { + my @stryMessage = split(':', $strMessage); + my @strHandle = split(',', $stryMessage[1]); + + my $hIn = IO::Handle->new_from_fd($strHandle[0], '<'); + my $hOut = IO::Handle->new_from_fd($strHandle[1], '>'); + + $self->{oThreadResult}->enqueue('running'); + + if ($stryMessage[0] eq 'compress') + { + gzip($hIn => $hOut) + or confess &log(ERROR, 'unable to compress: ' . $GzipError); + } + elsif ($stryMessage[0] eq 'decompress') + { + gunzip($hIn => $hOut) + or confess &log(ERROR, 'unable to decompress: ' . $GunzipError); + } + + close($hOut); + + $self->{oThreadResult}->enqueue('complete'); + } +} + +#################################################################################################################################### +# process_begin +# +# Begins the de/compress/checksum operation +#################################################################################################################################### +sub process_begin +{ + my $self = shift; + my $strProcess = shift; + my $hIn = shift; + + # Check if thread is already running + if (defined($self->{hPipeOut})) + { + confess &log(ASSERT, 'thread is already running in ProcessAsync->process_begin()'); + } + + # Validate strProcess + if (!defined($strProcess)) + { + confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()'); + } + + if ($strProcess ne 'compress' && $strProcess ne 'decompress' && $strProcess ne 'checksum') + { + confess &log(ASSERT, 'strProcess must be (compress, decompress, checksum) in call to ProcessAsync->process_begin():' . + " ${strProcess} was passed"); + } + + # Validate hIn + if (!defined($hIn)) + { + confess &log(ASSERT, 'strProcess must be defined in call to ProcessAsync->process_begin()'); + } + + # Open the in/out pipes + my $hPipeIn; + + pipe $self->{hPipeOut}, $hPipeIn; + + # Queue the queue job with the thread + $self->{oThreadQueue}->enqueue("${strProcess}:" . fileno($hIn) . ',' . fileno($hPipeIn)); + + # Wait for the thread to acknowledge that it has duplicated the file handles + my $strMessage = $self->{oThreadResult}->dequeue(); + + # Close input pipe so that thread has the only copy + if ($strMessage eq 'running') + { + close($hPipeIn); + } + # If any other message is returned then error + else + { + confess "unknown thread message while waiting for running: ${strMessage}"; + } + + return $self->{hPipeOut}; +} + +#################################################################################################################################### +# process_end +# +# Ends the de/compress/checksum operation +#################################################################################################################################### +sub process_end +{ + my $self = shift; + + # Check if thread is not running + if (!defined($self->{hPipeOut})) + { + confess &log(ASSERT, 'thread is not running in ProcessAsync->process_end()'); + } + + # Make sure the de/compress pipes are closed + close($self->{hPipeOut}); + $self->{hPipeOut} = undef; + + # Wait for the thread to acknowledge that it has completed + my $strMessage = $self->{oThreadResult}->dequeue(); + + if ($strMessage ne 'complete') + { + confess "unknown thread message while waiting for complete: ${strMessage}"; + } +} + +true; diff --git a/test/lib/BackRestTest/FileTest.pm b/test/lib/BackRestTest/FileTest.pm index 33b1b7590..ab53027d8 100755 --- a/test/lib/BackRestTest/FileTest.pm +++ b/test/lib/BackRestTest/FileTest.pm @@ -788,7 +788,7 @@ sub BackRestTestFile_Test &log(INFO, '--------------------------------------------------------------------------------'); &log(INFO, "test File->hash()\n"); - for (my $bRemote = 0; $bRemote <= 1; $bRemote++) + for (my $bRemote = false; $bRemote <= true; $bRemote++) { my $oFile = BackRest::File->new ( @@ -799,13 +799,13 @@ sub BackRestTestFile_Test ); # Loop through error - for (my $bError = 0; $bError <= 1; $bError++) + for (my $bError = false; $bError <= true; $bError++) { # Loop through exists - for (my $bExists = 0; $bExists <= 1; $bExists++) + for (my $bExists = false; $bExists <= true; $bExists++) { # Loop through exists - for (my $bCompressed = 0; $bCompressed <= 1; $bCompressed++) + for (my $bCompressed = false; $bCompressed <= true; $bCompressed++) { if (!BackRestTestCommon_Run(++$iRun, "rmt ${bRemote}, err ${bError}, exists ${bExists}, cmp ${bCompressed}")) {next} @@ -826,6 +826,12 @@ sub BackRestTestFile_Test else { system("echo 'TESTDATA' > ${strFile}"); + + if ($bCompressed && !$bRemote) + { + $oFile->compress(PATH_BACKUP_ABSOLUTE, $strFile); + $strFile = $strFile . '.gz'; + } } # Execute in eval in case of error