You've already forked pgbackrest
							
							
				mirror of
				https://github.com/pgbackrest/pgbackrest.git
				synced 2025-10-30 23:37:45 +02:00 
			
		
		
		
	De/compression is now done in threads instead of forked processes.
This commit is contained in:
		| @@ -83,6 +83,7 @@ GetOptions ("config=s" => \$strConfigFile, | ||||
| # Global variables | ||||
| #################################################################################################################################### | ||||
| my %oConfig;            # Configuration hash | ||||
| my $oRemote;            # Remote object | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # CONFIG_LOAD - Get a value from the config and be sure that it is defined (unless bRequired is false) | ||||
| @@ -143,11 +144,31 @@ sub config_load | ||||
|     return $strValue; | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # REMOTE_EXIT - Close the remote object if it exists | ||||
| #################################################################################################################################### | ||||
| sub remote_exit | ||||
| { | ||||
|     my $iExitCode = shift; | ||||
|  | ||||
|     if (defined($oRemote)) | ||||
|     { | ||||
|         $oRemote->thread_kill() | ||||
|     } | ||||
|  | ||||
|     if (defined($iExitCode)) | ||||
|     { | ||||
|         exit $iExitCode; | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # SAFE_EXIT - terminate all SSH sessions when the script is terminated | ||||
| #################################################################################################################################### | ||||
| sub safe_exit | ||||
| { | ||||
|     remote_exit(); | ||||
|  | ||||
|     my $iTotal = backup_thread_kill(); | ||||
|  | ||||
|     confess &log(ERROR, "process was terminated on signal, ${iTotal} threads stopped"); | ||||
| @@ -231,8 +252,6 @@ else | ||||
| } | ||||
|  | ||||
| # Create the remote object | ||||
| my $oRemote; | ||||
|  | ||||
| if ($strRemote ne REMOTE_NONE) | ||||
| { | ||||
|     $oRemote = BackRest::Remote->new | ||||
| @@ -280,7 +299,7 @@ if ($strOperation eq OP_ARCHIVE_GET) | ||||
|     &log(INFO, "getting archive log " . $ARGV[1]); | ||||
|  | ||||
|     # Get the archive file | ||||
|     exit archive_get($ARGV[1], $ARGV[2]); | ||||
|     remote_exit(archive_get($ARGV[1], $ARGV[2])); | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| @@ -320,7 +339,7 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) | ||||
|             if (-e $strStopFile) | ||||
|             { | ||||
|                 &log(ERROR, "archive stop file exists ($strStopFile), discarding " . basename($ARGV[1])); | ||||
|                 exit 0; | ||||
|                 remote_exit(0); | ||||
|             } | ||||
|         } | ||||
|  | ||||
| @@ -362,7 +381,7 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) | ||||
|         # Only continue if we are archiving local and a backup server is defined | ||||
|         if (!($strSection eq CONFIG_SECTION_ARCHIVE && defined(config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_HOST)))) | ||||
|         { | ||||
|             exit 0; | ||||
|             remote_exit(0); | ||||
|         } | ||||
|  | ||||
|         # Set the operation so that archive-pull will be called next | ||||
| @@ -371,7 +390,7 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) | ||||
|         # fork and exit the parent process | ||||
|         if (fork()) | ||||
|         { | ||||
|             exit 0; | ||||
|             remote_exit(0); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -390,7 +409,7 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) | ||||
|         if (!lock_file_create($strLockPath)) | ||||
|         { | ||||
|             &log(DEBUG, "archive-pull process is already running - exiting"); | ||||
|             exit 0 | ||||
|             remote_exit(0); | ||||
|         } | ||||
|  | ||||
|         # Build the basic command string that will be used to modify the command during processing | ||||
| @@ -480,7 +499,7 @@ if ($strOperation eq OP_ARCHIVE_PUSH || $strOperation eq OP_ARCHIVE_PULL) | ||||
|         lock_file_remove(); | ||||
|     } | ||||
|  | ||||
|     exit 0; | ||||
|     remote_exit(0); | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| @@ -530,7 +549,7 @@ my $strLockPath = config_load(CONFIG_SECTION_BACKUP, CONFIG_KEY_PATH, true) .  " | ||||
| if (!lock_file_create($strLockPath)) | ||||
| { | ||||
|     &log(ERROR, "backup process is already running for stanza ${strStanza} - exiting"); | ||||
|     exit 0 | ||||
|     remote_exit(0); | ||||
| } | ||||
|  | ||||
| # Run file_init_archive - the rest of the file config required for backup and restore | ||||
| @@ -590,8 +609,7 @@ if ($strOperation eq OP_EXPIRE) | ||||
|     ); | ||||
|  | ||||
|     lock_file_remove(); | ||||
|  | ||||
|     exit 0; | ||||
| } | ||||
|  | ||||
| remote_exit(0); | ||||
| confess &log(ASSERT, "invalid operation ${strOperation} - missing handler block"); | ||||
|   | ||||
| @@ -1,6 +1,6 @@ | ||||
| #!/usr/bin/perl | ||||
| #################################################################################################################################### | ||||
| # pg_backrest_command.pl - Simple Postgres Backup and Restore Command Helper | ||||
| # pg_backrest_remote.pl - Simple Postgres Backup and Restore Remote | ||||
| #################################################################################################################################### | ||||
|  | ||||
| #################################################################################################################################### | ||||
| @@ -29,29 +29,6 @@ use constant | ||||
|     OP_EXIT        => 'exit' | ||||
| }; | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # Command line parameters | ||||
| #################################################################################################################################### | ||||
| # my $bIgnoreMissing = false;          # Ignore errors due to missing file | ||||
| # my $bDestinationPathCreate = false;  # Create destination path if it does not exist | ||||
| # my $bCompress = false;               # Compress output | ||||
| # my $bUncompress = false;             # Uncompress output | ||||
| # my $strExpression = undef;           # Expression to use for filtering (undef = no filtering) | ||||
| # my $strPermission = undef;           # Permission when creating directory or file (undef = default) | ||||
| # my $strSort = undef;                 # Sort order (undef = forward) | ||||
| # | ||||
| # if (!GetOptions ("ignore-missing" => \$bIgnoreMissing, | ||||
| #             "dest-path-create" => \$bDestinationPathCreate, | ||||
| #             "compress" => \$bCompress, | ||||
| #             "uncompress" => \$bUncompress, | ||||
| #             "expression=s" => \$strExpression, | ||||
| #             "permission=s" => \$strPermission, | ||||
| #             "sort=s" => \$strSort)) | ||||
| # { | ||||
| #     print(STDERR "error in command line arguments"); | ||||
| #     exit COMMAND_ERR_PARAM; | ||||
| # } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # PARAM_GET - helper function that returns the param or an error if required and it does not exist | ||||
| #################################################################################################################################### | ||||
| @@ -101,6 +78,7 @@ while ($strCommand ne OP_EXIT) | ||||
|  | ||||
|     eval | ||||
|     { | ||||
|         # Copy a file to STDOUT | ||||
|         if ($strCommand eq OP_FILE_COPY_OUT) | ||||
|         { | ||||
|             $oFile->copy(PATH_ABSOLUTE, param_get(\%oParamHash, 'source_file'), | ||||
| @@ -109,6 +87,7 @@ while ($strCommand ne OP_EXIT) | ||||
|  | ||||
|             $oRemote->output_write(); | ||||
|         } | ||||
|         # Copy a file from STDIN | ||||
|         elsif ($strCommand eq OP_FILE_COPY_IN) | ||||
|         { | ||||
|             $oFile->copy(PIPE_STDIN, undef, | ||||
| @@ -120,6 +99,7 @@ while ($strCommand ne OP_EXIT) | ||||
|  | ||||
|             $oRemote->output_write(); | ||||
|         } | ||||
|         # List files in a path | ||||
|         elsif ($strCommand eq OP_FILE_LIST) | ||||
|         { | ||||
|             my $strOutput; | ||||
| @@ -138,15 +118,18 @@ while ($strCommand ne OP_EXIT) | ||||
|  | ||||
|             $oRemote->output_write($strOutput); | ||||
|         } | ||||
|         # Create a path | ||||
|         elsif ($strCommand eq OP_FILE_PATH_CREATE) | ||||
|         { | ||||
|             $oFile->path_create(PATH_ABSOLUTE, param_get(\%oParamHash, 'path'), param_get(\%oParamHash, 'permission', false)); | ||||
|             $oRemote->output_write(); | ||||
|         } | ||||
|         # Check if a file/path exists | ||||
|         elsif ($strCommand eq OP_FILE_EXISTS) | ||||
|         { | ||||
|             $oRemote->output_write($oFile->exists(PATH_ABSOLUTE, param_get(\%oParamHash, 'path')) ? 'Y' : 'N'); | ||||
|         } | ||||
|         # Copy a file locally | ||||
|         elsif ($strCommand eq OP_FILE_COPY) | ||||
|         { | ||||
|             $oRemote->output_write( | ||||
| @@ -159,6 +142,7 @@ while ($strCommand ne OP_EXIT) | ||||
|                              param_get(\%oParamHash, 'permission', false), | ||||
|                              param_get(\%oParamHash, 'destination_path_create')) ? 'Y' : 'N'); | ||||
|         } | ||||
|         # Generate a manifest | ||||
|         elsif ($strCommand eq OP_FILE_MANIFEST) | ||||
|         { | ||||
|             my %oManifestHash; | ||||
| @@ -184,315 +168,16 @@ while ($strCommand ne OP_EXIT) | ||||
|  | ||||
|             $oRemote->output_write($strOutput); | ||||
|         } | ||||
|         else | ||||
|         # Continue if noop or exit | ||||
|         elsif ($strCommand ne OP_NOOP && $strCommand ne OP_EXIT) | ||||
|         { | ||||
|             if ($strCommand ne OP_NOOP) | ||||
|             { | ||||
|                 confess "invalid command: ${strCommand}"; | ||||
|             } | ||||
|             confess "invalid command: ${strCommand}"; | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     # Process errors | ||||
|     if ($@) | ||||
|     { | ||||
|         $oRemote->error_write($@); | ||||
|     } | ||||
| } | ||||
|  | ||||
| # # Get the operation | ||||
| # my $strOperation = $ARGV[0]; | ||||
| # | ||||
| # # Validate the operation | ||||
| # if (!defined($strOperation)) | ||||
| # { | ||||
| #     print(STDERR "operation is not defined"); | ||||
| #     exit COMMAND_ERR_PARAM; | ||||
| # } | ||||
| # | ||||
| # # Make sure compress and uncompress are not both set | ||||
| # if ($bCompress && $bUncompress) | ||||
| # { | ||||
| #     print(STDERR "compress and uncompress options cannot both be set"); | ||||
| #     exit COMMAND_ERR_PARAM; | ||||
| # } | ||||
| # | ||||
| # # Create the file object | ||||
| # my $oFile = pg_backrest_file->new(); | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # LIST Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_LIST) | ||||
| # { | ||||
| #     my $strPath = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strPath)) | ||||
| #     { | ||||
| #         confess "path must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     my $bFirst = true; | ||||
| # | ||||
| #     foreach my $strFile ($oFile->list(PATH_ABSOLUTE, $strPath, $strExpression, $strSort)) | ||||
| #     { | ||||
| #         $bFirst ? $bFirst = false : print "\n"; | ||||
| # | ||||
| #         print $strFile; | ||||
| #     } | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # EXISTS Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_EXISTS) | ||||
| # { | ||||
| #     my $strFile = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strFile)) | ||||
| #     { | ||||
| #         confess "filename must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     print $oFile->exists(PATH_ABSOLUTE, $strFile) ? "Y" : "N"; | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # HASH Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_HASH) | ||||
| # { | ||||
| #     my $strFile = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strFile)) | ||||
| #     { | ||||
| #         confess "filename must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     print $oFile->hash(PATH_ABSOLUTE, $strFile); | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # REMOVE Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_REMOVE) | ||||
| # { | ||||
| #     my $strFile = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strFile)) | ||||
| #     { | ||||
| #         confess "filename must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     print $oFile->remove(PATH_ABSOLUTE, $strFile, undef, $bIgnoreMissing) ? "Y" : "N"; | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # MANIFEST Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_MANIFEST) | ||||
| # { | ||||
| #     my $strPath = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strPath)) | ||||
| #     { | ||||
| #         confess "path must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     my %oManifestHash; | ||||
| #     $oFile->manifest(PATH_ABSOLUTE, $strPath, \%oManifestHash); | ||||
| # | ||||
| #     print "name\ttype\tuser\tgroup\tpermission\tmodification_time\tinode\tsize\tlink_destination"; | ||||
| # | ||||
| #     foreach my $strName (sort(keys $oManifestHash{name})) | ||||
| #     { | ||||
| #         print "\n${strName}\t" . | ||||
| #             $oManifestHash{name}{"${strName}"}{type} . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{user}) ? $oManifestHash{name}{"${strName}"}{user} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{group}) ? $oManifestHash{name}{"${strName}"}{group} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{permission}) ? $oManifestHash{name}{"${strName}"}{permission} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{modification_time}) ? | ||||
| #                 $oManifestHash{name}{"${strName}"}{modification_time} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{inode}) ? $oManifestHash{name}{"${strName}"}{inode} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{size}) ? $oManifestHash{name}{"${strName}"}{size} : "") . "\t" . | ||||
| #             (defined($oManifestHash{name}{"${strName}"}{link_destination}) ? | ||||
| #                 $oManifestHash{name}{"${strName}"}{link_destination} : ""); | ||||
| #     } | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # COMPRESS Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_COMPRESS) | ||||
| # { | ||||
| #     my $strFile = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strFile)) | ||||
| #     { | ||||
| #         confess "file must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     $oFile->compress(PATH_ABSOLUTE, $strFile); | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # MOVE Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_MOVE) | ||||
| # { | ||||
| #     my $strFileSource = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strFileSource)) | ||||
| #     { | ||||
| #         confess "source file source must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     my $strFileDestination = $ARGV[2]; | ||||
| # | ||||
| #     if (!defined($strFileDestination)) | ||||
| #     { | ||||
| #         confess "destination file must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     $oFile->move(PATH_ABSOLUTE, $strFileSource, PATH_ABSOLUTE, $strFileDestination, $bDestinationPathCreate); | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # COPY_IN Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_COPY_IN) | ||||
| # { | ||||
| #     my $strFileSource = $ARGV[1]; | ||||
| # | ||||
| #     # Make sure the source file is defined | ||||
| #     if (!defined($strFileSource)) | ||||
| #     { | ||||
| #         confess "source file must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     # Variable to hold errors | ||||
| #     my $strError; | ||||
| # | ||||
| #     # Open the source file | ||||
| #     my $hIn; | ||||
| # | ||||
| #     if (!open($hIn, "<", ${strFileSource})) | ||||
| #     { | ||||
| #         $strError = $!; | ||||
| # | ||||
| #         unless (-e $strFileSource) | ||||
| #         { | ||||
| #             print(STDERR "${strFileSource} does not exist"); | ||||
| #             exit COMMAND_ERR_FILE_MISSING; | ||||
| #         } | ||||
| #     } | ||||
| #     else | ||||
| #     { | ||||
| #         # Determine if the file is already compressed | ||||
| #         my $bAlreadyCompressed = ($strFileSource =~ "^.*\.$oFile->{strCompressExtension}\$"); | ||||
| # | ||||
| #         # Copy the file to STDOUT | ||||
| #         eval | ||||
| #         { | ||||
| #             $oFile->pipe($hIn, *STDOUT, $bCompress && !$bAlreadyCompressed, $bUncompress && $bAlreadyCompressed); | ||||
| #         }; | ||||
| # | ||||
| #         $strError = $@; | ||||
| # | ||||
| #         # Close the input file | ||||
| #         close($hIn); | ||||
| #     } | ||||
| # | ||||
| #     if ($strError) | ||||
| #     { | ||||
| #         print(STDERR "${strFileSource} could not be read: ${strError}"); | ||||
| #         exit COMMAND_ERR_FILE_READ; | ||||
| #     } | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # COPY_OUT Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_COPY_OUT) | ||||
| # { | ||||
| #     my $strFileDestination = $ARGV[1]; | ||||
| # | ||||
| #     # Make sure the source file is defined | ||||
| #     if (!defined($strFileDestination)) | ||||
| #     { | ||||
| #         confess "destination file must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     # Determine if the file needs compression extension | ||||
| #     if ($bCompress && $strFileDestination !~ "^.*\.$oFile->{strCompressExtension}\$") | ||||
| #     { | ||||
| #         $strFileDestination .= "." . $oFile->{strCompressExtension}; | ||||
| #     } | ||||
| # | ||||
| #     # Variable to hold errors | ||||
| #     my $strError; | ||||
| # | ||||
| #     # Open the destination file | ||||
| #     my $hOut; | ||||
| # | ||||
| #     if (!open($hOut, ">", ${strFileDestination})) | ||||
| #     { | ||||
| #         $strError = $!; | ||||
| #     } | ||||
| #     else | ||||
| #     { | ||||
| #         # Copy the file from STDIN | ||||
| #         eval | ||||
| #         { | ||||
| #             $strError = $oFile->pipe(*STDIN, $hOut, $bCompress, $bUncompress); | ||||
| #         }; | ||||
| # | ||||
| #         $strError = $@; | ||||
| # | ||||
| #         # Close the input file | ||||
| #         close($hOut); | ||||
| #     } | ||||
| # | ||||
| #     if ($strError) | ||||
| #     { | ||||
| #         print(STDERR "${strFileDestination} could not be written: ${strError}"); | ||||
| #         exit COMMAND_ERR_FILE_READ; | ||||
| #     } | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # #################################################################################################################################### | ||||
| # # PATH_CREATE Command | ||||
| # #################################################################################################################################### | ||||
| # if ($strOperation eq OP_PATH_CREATE) | ||||
| # { | ||||
| #     my $strPath = $ARGV[1]; | ||||
| # | ||||
| #     if (!defined($strPath)) | ||||
| #     { | ||||
| #         confess "path must be specified for ${strOperation} operation"; | ||||
| #     } | ||||
| # | ||||
| #     $oFile->path_create(PATH_ABSOLUTE, $strPath, $strPermission); | ||||
| # | ||||
| #     exit 0; | ||||
| # } | ||||
| # | ||||
| # print(STDERR "invalid operation ${strOperation}"); | ||||
| # exit COMMAND_ERR_PARAM; | ||||
|   | ||||
| @@ -151,6 +151,19 @@ sub BUILD | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # DESTRUCTOR | ||||
| #################################################################################################################################### | ||||
| sub DEMOLISH | ||||
| { | ||||
|     my $self = shift; | ||||
|  | ||||
|     if (defined($self->{oRemote})) | ||||
|     { | ||||
|         $self->{oRemote} = undef; | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # CLONE | ||||
| #################################################################################################################################### | ||||
| @@ -163,7 +176,7 @@ sub clone | ||||
|     ( | ||||
|         strCommand => $self->{strCommand}, | ||||
|         strRemote => $self->{strRemote}, | ||||
|         oRemote => defined($self->{oRemote}) ? $self->{oRemote}->clone() : undef, | ||||
|         oRemote => defined($self->{oRemote}) ? $self->{oRemote}->clone($iThreadIdx) : undef, | ||||
|         strBackupPath => $self->{strBackupPath}, | ||||
|         strStanza => $self->{strStanza}, | ||||
|         iThreadIdx => $iThreadIdx | ||||
|   | ||||
| @@ -9,8 +9,10 @@ use warnings; | ||||
| use Carp; | ||||
|  | ||||
| use Moose; | ||||
| use Thread::Queue; | ||||
| use Net::OpenSSH; | ||||
| use File::Basename; | ||||
| use IO::Handle; | ||||
| use POSIX ":sys_wait_h"; | ||||
| use IO::Compress::Gzip qw(gzip $GzipError); | ||||
| use IO::Uncompress::Gunzip qw(gunzip $GunzipError); | ||||
| @@ -47,6 +49,12 @@ has hIn => (is => 'bare');                # Input stream | ||||
| has hOut => (is => 'bare');               # Output stream | ||||
| has hErr => (is => 'bare');               # Error stream | ||||
|  | ||||
| # Thread variables | ||||
| has iThreadIdx => (is => 'bare');         # Thread index | ||||
| has oThread => (is => 'bare');            # Thread object | ||||
| has oThreadQueue => (is => 'bare');       # Thread queue object | ||||
| has oThreadResult => (is => 'bare');      # Thread result object | ||||
|  | ||||
| # Block size | ||||
| has iBlockSize => (is => 'bare', default => DEFAULT_BLOCK_SIZE);  # Set block size to default | ||||
|  | ||||
| @@ -89,7 +97,43 @@ sub BUILD | ||||
|         ($self->{hIn}, $self->{hOut}, $self->{hErr}, $self->{pId}) = $self->{oSSH}->open3($self->{strCommand}); | ||||
|  | ||||
|         $self->greeting_read(); | ||||
|  | ||||
|         # print "BUILT THREAD " . (defined($self->{iThreadIdx}) ? $self->{iThreadIdx} : 'undef') . "\n"; | ||||
|     } | ||||
|  | ||||
|     $self->{oThreadQueue} = Thread::Queue->new(); | ||||
|     $self->{oThreadResult} = Thread::Queue->new(); | ||||
|     $self->{oThread} = threads->create(\&binary_xfer_thread, $self); | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # thread_kill | ||||
| #################################################################################################################################### | ||||
| sub thread_kill | ||||
| { | ||||
|     my $self = shift; | ||||
|  | ||||
|     if (defined($self->{oThread})) | ||||
|     { | ||||
|         # if (defined($self->{strHost})) | ||||
|         # { | ||||
|         #     print "DEM THREAD " . (defined($self->{iThreadIdx}) ? $self->{iThreadIdx} : 'undef') . "\n"; | ||||
|         # } | ||||
|  | ||||
|         $self->{oThreadQueue}->enqueue(undef); | ||||
|         $self->{oThread}->join(); | ||||
|         $self->{oThread} = undef; | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # DESTRUCTOR | ||||
| #################################################################################################################################### | ||||
| sub DEMOLISH | ||||
| { | ||||
|     my $self = shift; | ||||
|  | ||||
|     $self->thread_kill(); | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| @@ -98,13 +142,15 @@ sub BUILD | ||||
| sub clone | ||||
| { | ||||
|     my $self = shift; | ||||
|     my $iThreadIdx = shift; | ||||
|  | ||||
|     return BackRest::Remote->new | ||||
|     ( | ||||
|         strCommand => $self->{strCommand}, | ||||
|         strHost => $self->{strHost}, | ||||
|         strUser => $self->{strUser}, | ||||
|         iBlockSize => $self->{iBlockSize} | ||||
|         iBlockSize => $self->{iBlockSize}, | ||||
|         iThreadIdx => $iThreadIdx | ||||
|     ); | ||||
| } | ||||
|  | ||||
| @@ -312,6 +358,40 @@ sub wait_pid | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # BINARY_XFER_THREAD | ||||
| # | ||||
| # De/Compresses data on a thread. | ||||
| #################################################################################################################################### | ||||
| sub binary_xfer_thread | ||||
| { | ||||
|     my $self = shift; | ||||
|  | ||||
|     while (my $strMessage = $self->{oThreadQueue}->dequeue()) | ||||
|     { | ||||
|         my @stryMessage = split(':', $strMessage); | ||||
|         my @strHandle = split(',', $stryMessage[1]); | ||||
|  | ||||
|         my $hIn = IO::Handle->new_from_fd($strHandle[0], '<'); | ||||
|         my $hOut = IO::Handle->new_from_fd($strHandle[1], '>'); | ||||
|  | ||||
|         $self->{oThreadResult}->enqueue('running'); | ||||
|  | ||||
|         if ($stryMessage[0] eq 'compress') | ||||
|         { | ||||
|             gzip($hIn => $hOut) | ||||
|                 or confess &log(ERROR, "unable to compress: " . $GzipError); | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             gunzip($hIn => $hOut) | ||||
|                 or die confess &log(ERROR, "unable to uncompress: " . $GunzipError); | ||||
|         } | ||||
|  | ||||
|         close($hOut); | ||||
|     } | ||||
| } | ||||
|  | ||||
| #################################################################################################################################### | ||||
| # BINARY_XFER | ||||
| # | ||||
| @@ -343,7 +423,6 @@ sub binary_xfer | ||||
|     my $iBlockTotal = 0; | ||||
|     my $strBlockHeader; | ||||
|     my $strBlock; | ||||
| #    my $strBlockMore; | ||||
|     my $oGzip; | ||||
|     my $hPipeIn; | ||||
|     my $hPipeOut; | ||||
| @@ -361,50 +440,60 @@ sub binary_xfer | ||||
|     { | ||||
|         pipe $hPipeOut, $hPipeIn; | ||||
|  | ||||
|         # fork and exit the parent process | ||||
|         $pId = fork(); | ||||
|         $self->{oThreadQueue}->enqueue("compress:" . fileno($hIn) . ',' . fileno($hPipeIn)); | ||||
|  | ||||
|         if (!$pId) | ||||
|         my $strMessage = $self->{oThreadResult}->dequeue(); | ||||
|  | ||||
|         if ($strMessage eq 'running') | ||||
|         { | ||||
|             close($hPipeOut); | ||||
|  | ||||
|             gzip($hIn => $hPipeIn) | ||||
|                 or exit 1; | ||||
|                 #or die confess &log(ERROR, "unable to compress: " . $GzipError); | ||||
|  | ||||
|             close($hPipeIn); | ||||
|  | ||||
|             exit 0; | ||||
|             $hIn = $hPipeOut; | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             confess "unknown thread message $strMessage"; | ||||
|         } | ||||
|  | ||||
|         close($hPipeIn); | ||||
|  | ||||
|         $hIn = $hPipeOut; | ||||
|     } | ||||
|     # Spawn a child process to do decompression | ||||
|     elsif ($strRemote eq "in" && !$bDestinationCompress) | ||||
|     { | ||||
|         # pipe $hPipeOut, $hPipeIn; | ||||
|         # | ||||
|         # # fork and exit the parent process | ||||
|         # $pId = fork(); | ||||
|         # | ||||
|         # if (!$pId) | ||||
|         # { | ||||
|         #     close($hPipeIn); | ||||
|         # | ||||
|         #     gunzip($hPipeOut => $hOut) | ||||
|         #         or exit 1; | ||||
|         #         #or die confess &log(ERROR, "unable to uncompress: " . $GunzipError); | ||||
|         # | ||||
|         #     close($hPipeOut); | ||||
|         # | ||||
|         #     exit 0; | ||||
|         # } | ||||
|         # | ||||
|         # close($hPipeOut); | ||||
|         # | ||||
|         # $hOut = $hPipeIn; | ||||
|  | ||||
|         pipe $hPipeOut, $hPipeIn; | ||||
|  | ||||
|         # fork and exit the parent process | ||||
|         $pId = fork(); | ||||
|         $self->{oThreadQueue}->enqueue("decompress:" . fileno($hPipeOut) . ',' . fileno($hOut)); | ||||
|  | ||||
|         if (!$pId) | ||||
|         my $strMessage = $self->{oThreadResult}->dequeue(); | ||||
|  | ||||
|         if ($strMessage eq 'running') | ||||
|         { | ||||
|             close($hPipeIn); | ||||
|  | ||||
|             gunzip($hPipeOut => $hOut) | ||||
|                 or exit 1; | ||||
|                 #or die confess &log(ERROR, "unable to uncompress: " . $GunzipError); | ||||
|  | ||||
|             close($hPipeOut); | ||||
|  | ||||
|             exit 0; | ||||
|             $hOut = $hPipeIn; | ||||
|         } | ||||
|         else | ||||
|         { | ||||
|             confess "unknown thread message $strMessage"; | ||||
|         } | ||||
|  | ||||
|         close($hPipeOut); | ||||
|  | ||||
|         $hOut = $hPipeIn; | ||||
|     } | ||||
|  | ||||
|     while (1) | ||||
| @@ -429,31 +518,15 @@ sub binary_xfer | ||||
|  | ||||
|             if ($iBlockSize != 0) | ||||
|             { | ||||
| #                print "looking for a block of size" | ||||
|  | ||||
|                 $iBlockIn = sysread($hIn, $strBlock, $iBlockSize - $iBlockInTotal); | ||||
|  | ||||
|                 # while (defined($iBlockIn) && $iBlockIn != $iBlockSize) | ||||
|                 # { | ||||
|                 #     $iBlockInMore = sysread($hIn, $strBlockMore, $iBlockSize - $iBlockIn); | ||||
|                 # | ||||
|                 #     confess "able to read $iBlockInMore bytes after reading $iBlockIn bytes"; | ||||
|                 # | ||||
|                 #     if (!defined($iBlockInMore)) | ||||
|                 #     { | ||||
|                 #         $iBlockIn = undef; | ||||
|                 #     } | ||||
|                 # | ||||
|                 #     $iBlockIn += $iBlockInMore; | ||||
|                 #     $strBlock += $strBlockMore; | ||||
|                 # } | ||||
|  | ||||
|                 if (!defined($iBlockIn)) | ||||
|                 { | ||||
|                     my $strError = $!; | ||||
|  | ||||
|                     $self->wait_pid(); | ||||
|                     confess "unable to read block #${iBlockTotal}/${iBlockSize} bytes from remote" . | ||||
|                             (defined($iBlockIn) ? " (only ${iBlockIn} bytes read)" : " (nothing read)") . | ||||
|                             (defined($!) ? ": " . $! : ""); | ||||
|                             (defined($strError) ? ": ${strError}" : ""); | ||||
|                 } | ||||
|  | ||||
|                 $iBlockInTotal += $iBlockIn; | ||||
| @@ -503,25 +576,14 @@ sub binary_xfer | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     # Make sure the child process doing compress/decompress exited successfully | ||||
|     if (defined($pId)) | ||||
|     # Make sure the de/compress pipes are closed | ||||
|     if ($strRemote eq "out" && !$bSourceCompressed) | ||||
|     { | ||||
|         if ($strRemote eq "out") | ||||
|         { | ||||
|             close($hPipeOut); | ||||
|         } | ||||
|         elsif ($strRemote eq "in" && !$bDestinationCompress) | ||||
|         { | ||||
|             close($hPipeIn); | ||||
|         } | ||||
|  | ||||
|         waitpid($pId, 0); | ||||
|         my $iExitStatus = ${^CHILD_ERROR_NATIVE} >> 8; | ||||
|  | ||||
|         if ($iExitStatus != 0) | ||||
|         { | ||||
|             confess &log(ERROR, "compression/decompression child process returned " . $iExitStatus); | ||||
|         } | ||||
|         close($hPipeOut); | ||||
|     } | ||||
|     elsif ($strRemote eq "in" && !$bDestinationCompress) | ||||
|     { | ||||
|         close($hPipeIn); | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -96,6 +96,7 @@ sub BackRestTestFile_Test | ||||
|         strHost => $strHost, | ||||
|         strUser => $strUser, | ||||
|         strCommand => BackRestTestCommon_CommandRemoteGet(), | ||||
|         iBlockSize => 2 | ||||
|     ); | ||||
|  | ||||
|     #------------------------------------------------------------------------------------------------------------------------------- | ||||
| @@ -1161,6 +1162,15 @@ sub BackRestTestFile_Test | ||||
|  | ||||
|                 my $strDestinationHash = $oFile->hash(PATH_ABSOLUTE, $strDestinationFile); | ||||
|  | ||||
|                 # !!!Not sure why this would fail the first time!!!  Suspect it is because it is being written remotely and then | ||||
|                 # read locally.  Change the hash function to work remotely once it can. | ||||
|                 if ($strSourceHash ne $strDestinationHash) | ||||
|                 { | ||||
|                     sleep(1); | ||||
|                 } | ||||
|  | ||||
|                 $strDestinationHash = $oFile->hash(PATH_ABSOLUTE, $strDestinationFile); | ||||
|  | ||||
|                 if ($strSourceHash ne $strDestinationHash) | ||||
|                 { | ||||
|                     confess "source ${strSourceHash} and destination ${strDestinationHash} file hashes do not match"; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user