1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2026-05-22 10:15:16 +02:00

Add HTTP retries to harden against transient S3 network errors.

This commit is contained in:
David Steele
2017-09-03 16:48:41 -04:00
parent 80e5e8db01
commit eea2ccc3ab
5 changed files with 198 additions and 117 deletions
+4
View File
@@ -57,6 +57,10 @@
<release-item> <release-item>
<p>Increase HTTP timeout for S3.</p> <p>Increase HTTP timeout for S3.</p>
</release-item> </release-item>
<release-item>
<p>Add HTTP retries to harden against transient S3 network errors.</p>
</release-item>
</release-refactor-list> </release-refactor-list>
</release-core-list> </release-core-list>
+164 -115
View File
@@ -54,7 +54,9 @@ sub new
$hQuery, $hQuery,
$hRequestHeader, $hRequestHeader,
$rstrRequestBody, $rstrRequestBody,
$bResponseBodyPrefetch,
$iProtocolTimeout, $iProtocolTimeout,
$iTryTotal,
$lBufferMax, $lBufferMax,
$bVerifySsl, $bVerifySsl,
$strCaPath, $strCaPath,
@@ -70,138 +72,182 @@ sub new
{name => 'hQuery', optional => true, trace => true}, {name => 'hQuery', optional => true, trace => true},
{name => 'hRequestHeader', optional => true, trace => true}, {name => 'hRequestHeader', optional => true, trace => true},
{name => 'rstrRequestBody', optional => true, trace => true}, {name => 'rstrRequestBody', optional => true, trace => true},
{name => 'iProtocolTimeout', optional => true, default => 90, trace => true}, {name => 'bResponseBodyPrefetch', optional => true, default => false, trace => true},
{name => 'iProtocolTimeout', optional => true, default => 300, trace => true},
{name => 'iTryTotal', optional => true, default => 3, trace => true},
{name => 'lBufferMax', optional => true, default => 32768, trace => true}, {name => 'lBufferMax', optional => true, default => 32768, trace => true},
{name => 'bVerifySsl', optional => true, default => true, trace => true}, {name => 'bVerifySsl', optional => true, default => true, trace => true},
{name => 'strCaPath', optional => true, trace => true}, {name => 'strCaPath', optional => true, trace => true},
{name => 'strCaFile', optional => true, trace => true}, {name => 'strCaFile', optional => true, trace => true},
); );
# Connect to the server # Retry as many times as requested
my $oSocket; my $self;
my $iTry = 1;
my $bRetry;
eval do
{ {
$oSocket = IO::Socket::SSL->new( # Disable logging if a failure will be retried
PeerHost => $strHost, PeerPort => $iPort, SSL_verify_mode => $bVerifySsl ? SSL_VERIFY_PEER : SSL_VERIFY_NONE, logDisable() if $iTry < $iTryTotal;
SSL_ca_path => $strCaPath, SSL_ca_file => $strCaFile); $bRetry = false;
return 1; eval
}
or do
{
logErrorResult(ERROR_HOST_CONNECT, $EVAL_ERROR);
};
# Check for errors
if (!defined($oSocket))
{
logErrorResult(
ERROR_HOST_CONNECT, coalesce(length($!) == 0 ? undef : $!, $SSL_ERROR), length($!) > 0 ? $SSL_ERROR : undef);
}
# Create the buffered IO object
my $self = new pgBackRest::Common::Io::Buffered(
new pgBackRest::Common::Io::Handle('httpClient', $oSocket, $oSocket), $iProtocolTimeout, $lBufferMax);
# Bless with the class
@ISA = $self->isA(); ## no critic (ClassHierarchies::ProhibitExplicitISA)
bless $self, $class;
# Store socket
$self->{oSocket} = $oSocket;
# Generate the query string
my $strQuery = httpQuery($hQuery);
# Construct the request headers
$self->{strRequestHeader} = "${strVerb} ${strUri}?${strQuery} HTTP/1.1" . "\r\n";
foreach my $strHeader (sort(keys(%{$hRequestHeader})))
{
$self->{strRequestHeader} .= "${strHeader}: $hRequestHeader->{$strHeader}\r\n";
}
$self->{strRequestHeader} .= "\r\n";
# Write request headers
$self->write(\$self->{strRequestHeader});
# Write content
if (defined($rstrRequestBody))
{
my $iTotalSize = length($$rstrRequestBody);
my $iTotalSent = 0;
# Write the request body in buffer-sized chunks
do
{ {
my $strBufferWrite = substr($$rstrRequestBody, $iTotalSent, $lBufferMax); # Connect to the server
$iTotalSent += $self->write(\$strBufferWrite); my $oSocket;
} while ($iTotalSent < $iTotalSize);
}
# Read response code eval
($self->{strResponseProtocol}, $self->{iResponseCode}, $self->{strResponseMessage}) = split(' ', trim($self->readLine()));
# Read the response headers
$self->{iContentLength} = undef;
$self->{strResponseHeader} = '';
my $strHeader = trim($self->readLine());
while ($strHeader ne '')
{
# Validate header
$self->{strResponseHeader} .= "${strHeader}\n";
my $iColonPos = index($strHeader, ':');
if ($iColonPos == -1)
{
confess &log(ERROR, "http header '${strHeader}' requires colon separator", ERROR_PROTOCOL);
}
# Parse header
my $strHeaderKey = lc(substr($strHeader, 0, $iColonPos));
my $strHeaderValue = trim(substr($strHeader, $iColonPos + 1));
# Store the header
$self->{hResponseHeader}{$strHeaderKey} = $strHeaderValue;
# Process content length
if ($strHeaderKey eq HTTP_HEADER_CONTENT_LENGTH)
{
$self->{iContentLength} = $strHeaderValue + 0;
$self->{iContentRemaining} = $self->{iContentLength};
}
# Process transfer encoding (only chunked is supported)
elsif ($strHeaderKey eq HTTP_HEADER_TRANSFER_ENCODING)
{
if ($strHeaderValue eq 'chunked')
{ {
$self->{iContentLength} = -1; $oSocket = IO::Socket::SSL->new(
PeerHost => $strHost, PeerPort => $iPort, SSL_verify_mode => $bVerifySsl ? SSL_VERIFY_PEER : SSL_VERIFY_NONE,
SSL_ca_path => $strCaPath, SSL_ca_file => $strCaFile);
return 1;
} }
else or do
{ {
confess &log(ERROR, "invalid value '${strHeaderValue} for http header '${strHeaderKey}'", ERROR_PROTOCOL); logErrorResult(ERROR_HOST_CONNECT, $EVAL_ERROR);
};
# Check for errors
if (!defined($oSocket))
{
logErrorResult(
ERROR_HOST_CONNECT, coalesce(length($!) == 0 ? undef : $!, $SSL_ERROR), length($!) > 0 ? $SSL_ERROR : undef);
} }
# Create the buffered IO object
$self = new pgBackRest::Common::Io::Buffered(
new pgBackRest::Common::Io::Handle('httpClient', $oSocket, $oSocket), $iProtocolTimeout, $lBufferMax);
# Bless with the class
@ISA = $self->isA(); ## no critic (ClassHierarchies::ProhibitExplicitISA)
bless $self, $class;
# Store socket
$self->{oSocket} = $oSocket;
# Generate the query string
my $strQuery = httpQuery($hQuery);
# Construct the request headers
$self->{strRequestHeader} = "${strVerb} ${strUri}?${strQuery} HTTP/1.1" . "\r\n";
foreach my $strHeader (sort(keys(%{$hRequestHeader})))
{
$self->{strRequestHeader} .= "${strHeader}: $hRequestHeader->{$strHeader}\r\n";
}
$self->{strRequestHeader} .= "\r\n";
# Write request headers
$self->write(\$self->{strRequestHeader});
# Write content
if (defined($rstrRequestBody))
{
my $iTotalSize = length($$rstrRequestBody);
my $iTotalSent = 0;
# Write the request body in buffer-sized chunks
do
{
my $strBufferWrite = substr($$rstrRequestBody, $iTotalSent, $lBufferMax);
$iTotalSent += $self->write(\$strBufferWrite);
} while ($iTotalSent < $iTotalSize);
}
# Read response code
($self->{strResponseProtocol}, $self->{iResponseCode}, $self->{strResponseMessage}) =
split(' ', trim($self->readLine()));
# Read the response headers
$self->{iContentLength} = undef;
$self->{strResponseHeader} = '';
my $strHeader = trim($self->readLine());
while ($strHeader ne '')
{
# Validate header
$self->{strResponseHeader} .= "${strHeader}\n";
my $iColonPos = index($strHeader, ':');
if ($iColonPos == -1)
{
confess &log(ERROR, "http header '${strHeader}' requires colon separator", ERROR_PROTOCOL);
}
# Parse header
my $strHeaderKey = lc(substr($strHeader, 0, $iColonPos));
my $strHeaderValue = trim(substr($strHeader, $iColonPos + 1));
# Store the header
$self->{hResponseHeader}{$strHeaderKey} = $strHeaderValue;
# Process content length
if ($strHeaderKey eq HTTP_HEADER_CONTENT_LENGTH)
{
$self->{iContentLength} = $strHeaderValue + 0;
$self->{iContentRemaining} = $self->{iContentLength};
}
# Process transfer encoding (only chunked is supported)
elsif ($strHeaderKey eq HTTP_HEADER_TRANSFER_ENCODING)
{
if ($strHeaderValue eq 'chunked')
{
$self->{iContentLength} = -1;
}
else
{
confess &log(ERROR, "invalid value '${strHeaderValue} for http header '${strHeaderKey}'", ERROR_PROTOCOL);
}
}
# Read next header
$strHeader = trim($self->readLine());
}
# Test response code
if ($self->{iResponseCode} == 200)
{
# Content length should have been defined either by content-length or transfer encoding
if (!defined($self->{iContentLength}))
{
confess &log(ERROR,
HTTP_HEADER_CONTENT_LENGTH . ' or ' . HTTP_HEADER_TRANSFER_ENCODING . ' must be defined', ERROR_PROTOCOL);
}
}
# Prefetch response - mostly useful when the response is known to be short
if ($bResponseBodyPrefetch)
{
$self->{strResponseBody} = $self->responseBody();
}
# Enable logging if a failure will be retried
logEnable() if $iTry < $iTryTotal;
return 1;
} }
or do
# Read next header
$strHeader = trim($self->readLine());
}
# Test response code
if ($self->{iResponseCode} == 200)
{
# Content length should have been defined either by content-length or transfer encoding
if (!defined($self->{iContentLength}))
{ {
confess &log(ERROR, # Enable logging if a failure will be retried
HTTP_HEADER_CONTENT_LENGTH . ' or ' . HTTP_HEADER_TRANSFER_ENCODING . ' must be defined', ERROR_PROTOCOL); logEnable() if $iTry < $iTryTotal;
}
# If tries reaches total allowed then error
if ($iTry == $iTryTotal)
{
confess $EVAL_ERROR;
}
# Try again
$iTry++;
$bRetry = true;
};
} }
while ($bRetry);
# Return from function and log return values if any # Return from function and log return values if any
return logDebugReturn return logDebugReturn
@@ -261,9 +307,12 @@ sub responseBody
__PACKAGE__ . '->responseBody' __PACKAGE__ . '->responseBody'
); );
# Return prefetched response body if it exists
return $self->{strResponseBody} if exists($self->{strResponseBody});
# Fetch response body if content length is not 0
my $strResponseBody = undef; my $strResponseBody = undef;
# Nothing to do if content length is 0
if ($self->{iContentLength} != 0) if ($self->{iContentLength} != 0)
{ {
# Transfer encoding is chunked # Transfer encoding is chunked
+2 -1
View File
@@ -163,7 +163,8 @@ sub request
$self->{strHost}, $strVerb, $self->{strHost}, $strVerb,
{iPort => $self->{iPort}, strUri => $strUri, hQuery => $hQuery, hRequestHeader => $hHeader, {iPort => $self->{iPort}, strUri => $strUri, hQuery => $hQuery, hRequestHeader => $hHeader,
rstrRequestBody => $rstrBody, bVerifySsl => $self->{bVerifySsl}, strCaPath => $self->{strCaPath}, rstrRequestBody => $rstrBody, bVerifySsl => $self->{bVerifySsl}, strCaPath => $self->{strCaPath},
strCaFile => $self->{strCaFile}, lBufferMax => $self->{lBufferMax}}); strCaFile => $self->{strCaFile}, bResponseBodyPrefetch => $strResponseType eq S3_RESPONSE_TYPE_XML,
lBufferMax => $self->{lBufferMax}});
# Check response code # Check response code
my $iResponseCode = $oHttpClient->responseCode(); my $iResponseCode = $oHttpClient->responseCode();
+1 -1
View File
@@ -90,7 +90,7 @@ my $oTestDef =
[ [
{ {
&TESTDEF_NAME => 'http-client', &TESTDEF_NAME => 'http-client',
&TESTDEF_TOTAL => 1, &TESTDEF_TOTAL => 2,
&TESTDEF_COVERAGE => &TESTDEF_COVERAGE =>
{ {
@@ -141,6 +141,33 @@ sub run
$self->testResult(sub {${$oHttpClient->responseBody()}}, $strTestData, 'response body read'); $self->testResult(sub {${$oHttpClient->responseBody()}}, $strTestData, 'response body read');
} }
################################################################################################################################
if ($self->begin('retry'))
{
$self->httpsServer(sub
{
$self->httpsServerAccept();
$self->{oConnection}->write("HTTP/1.1 200 NoContentLengthMessage1\r\n\r\n");
$self->httpsServerAccept();
$self->{oConnection}->write("HTTP/1.1 200 NoContentLengthMessage2\r\n\r\n");
$self->httpsServerAccept();
$self->httpsServerResponse(200, $strTestData);
});
#---------------------------------------------------------------------------------------------------------------------------
$self->testException(
sub {new pgBackRest::Common::Http::Client(
$strTestHost, HTTP_VERB_GET, {iPort => HTTPS_TEST_PORT, bVerifySsl => false, iTryTotal => 1})},
ERROR_PROTOCOL, 'content-length or transfer-encoding must be defined');
$self->testResult(
sub {new pgBackRest::Common::Http::Client(
$strTestHost, HTTP_VERB_GET, {iPort => HTTPS_TEST_PORT, bVerifySsl => false, iTryTotal => 2})},
'[object]', 'successful retries');
}
} }
1; 1;