#################################################################################################################################### # S3 File Write #################################################################################################################################### package pgBackRest::Storage::S3::FileWrite; use parent 'pgBackRest::Common::Io::Base'; use strict; use warnings FATAL => qw(all); use Carp qw(confess); use English '-no_match_vars'; use Digest::MD5 qw(md5_base64); use Fcntl qw(O_RDONLY O_WRONLY O_CREAT O_TRUNC); use File::Basename qw(dirname); use pgBackRest::Common::Exception; use pgBackRest::Common::Io::Handle; use pgBackRest::Common::Log; use pgBackRest::Common::Xml; use pgBackRest::Storage::Base; use pgBackRest::Storage::S3::Request; #################################################################################################################################### # Constants #################################################################################################################################### use constant S3_BUFFER_MAX => 16777216; #################################################################################################################################### # CONSTRUCTOR #################################################################################################################################### sub new { my $class = shift; # Assign function parameters, defaults, and log debug info my ( $strOperation, $oDriver, $strName, ) = logDebugParam ( __PACKAGE__ . '->new', \@_, {name => 'oDriver', trace => true}, {name => 'strName', trace => true}, ); # Create the class hash my $self = $class->SUPER::new("'${strName}'"); bless $self, $class; # Set variables $self->{oDriver} = $oDriver; $self->{strName} = $strName; # Start with an empty buffer $self->{rtBuffer} = ''; # Has anything been written? $self->{bWritten} = false; # How much has been written? $self->{lSize} = 0; # Return from function and log return values if any return logDebugReturn ( $strOperation, {name => 'self', value => $self, trace => true} ); } #################################################################################################################################### # open - open the file #################################################################################################################################### sub open { my $self = shift; # Request an upload id my $oResponse = $self->{oDriver}->request( HTTP_VERB_POST, {strUri => $self->{strName}, hQuery => 'uploads=', strResponseType => S3_RESPONSE_TYPE_XML}); $self->{strUploadId} = xmlTagText($oResponse, 'UploadId'); # Intialize the multi-part array $self->{rstryMultiPart} = []; } #################################################################################################################################### # write - write data to a file #################################################################################################################################### sub write { my $self = shift; my $rtBuffer = shift; # Note that write has been called $self->{bWritten} = true; if (defined($rtBuffer)) { $self->{rtBuffer} .= $$rtBuffer; $self->{lSize} += length($$rtBuffer); # Wait until buffer is at least max before writing to avoid writing smaller files multi-part if (length($self->{rtBuffer}) >= S3_BUFFER_MAX) { $self->flush(); } return length($$rtBuffer); } return 0; } #################################################################################################################################### # flush - flush whatever is in the buffer out #################################################################################################################################### sub flush { my $self = shift; # Open file if it is not open already $self->open() if !$self->opened(); # Put a file $self->{oDriver}->request( HTTP_VERB_PUT, {strUri => $self->{strName}, hQuery => {'partNumber' => @{$self->{rstryMultiPart}} + 1, 'uploadId' => $self->{strUploadId}}, rstrBody => \$self->{rtBuffer}, hHeader => {'content-md5' => md5_base64($self->{rtBuffer}) . '=='}}); # Store the returned etag push(@{$self->{rstryMultiPart}}, $self->{oDriver}->{hResponseHeader}{&S3_HEADER_ETAG}); # Clear the buffer $self->{rtBuffer} = ''; } #################################################################################################################################### # close - close the file #################################################################################################################################### sub close { my $self = shift; # Only close if something was written if ($self->{bWritten}) { # Make sure close does not run again $self->{bWritten} = false; # If the file is open then multipart transfer has already started and must be completed if ($self->opened()) { # flush out whatever is in the buffer $self->flush(); my $strXml = XML_HEADER . ''; my $iPartNo = 0; foreach my $strETag (@{$self->{rstryMultiPart}}) { $iPartNo++; $strXml .= "${iPartNo}${strETag}"; } $strXml .= ''; # Finalize file my $oResponse = $self->{oDriver}->request( HTTP_VERB_POST, {strUri => $self->{strName}, hQuery => {'uploadId' => $self->{strUploadId}}, rstrBody => \$strXml, hHeader => {'content-md5' => md5_base64($strXml) . '=='}, strResponseType => S3_RESPONSE_TYPE_XML}); } # Else the file can be transmitted in one block else { $self->{oDriver}->request( HTTP_VERB_PUT, {strUri => $self->{strName}, rstrBody => \$self->{rtBuffer}, hHeader => {'content-md5' => md5_base64($self->{rtBuffer}) . '=='}}); } } # This is how we report the write size back to the caller. It's a bit hokey -- results are based on the object name and in all # other cases I/O passes through Io::Handle. It doesn't seem worth fixing since it could break things and this code is being # migrated to C anyway. $self->resultSet(COMMON_IO_HANDLE, $self->{lSize}); return true; } #################################################################################################################################### # Getters #################################################################################################################################### sub opened {defined(shift->{strUploadId})} sub name {shift->{strName}} 1;