1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-07-07 00:35:37 +02:00
Files
pgbackrest/src/common/io/http/request.c

314 lines
13 KiB
C
Raw Normal View History

Asynchronous S3 multipart upload. When uploading large files the upload is split into multiple parts which are assembled at the end to create the final file. Previously we waited until each part was acknowledged before starting on the processing (i.e. compression, etc.) of the next part. Now, the request for each part is sent while processing continues and the response is read just before sending the request for the next part. This asynchronous method allows us to continue processing while the S3 server formulates a response. Testing from outside AWS in a high-bandwidth, low-latency environment showed a 35% improvement in the upload time of 1GB files. The time spent waiting for multipart notifications was reduced by ~300% (this measurement included the final part which is not uploaded asynchronously). There are still some possible improvements: 1) the creation of the multipart id could be made asynchronous when it looks like the upload will need to be multipart (this may incur cost if the upload turns out not to be multipart). 2) allow more than one async request (this will use more memory). A fair amount of refactoring was required to make the HTTP responses asynchronous. This may seem like overkill but having well-defined request, response, and session objects will also be advantageous for the upcoming HTTP server functionality. Another advantage is that the lifecycle of an HttpSession is better defined. We only want to reuse sessions that complete the request/response cycle successfully, otherwise we consider the session to be in a bad state and would prefer to start clean with a new one. Previously, this required complex notifications to mark a session as "successfully done". Now, ownership of the session is passed to the request and then the response and only returned to the client after a successful response. If an error occurs anywhere along the way the session will be automatically closed by the object destructor when the request/response object is freed (depending on which one currently owns the session).
2020-06-24 13:44:00 -04:00
/***********************************************************************************************************************************
HTTP Request
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/debug.h"
#include "common/io/http/common.h"
#include "common/io/http/request.h"
#include "common/log.h"
#include "common/type/object.h"
#include "common/wait.h"
/***********************************************************************************************************************************
HTTP constants
***********************************************************************************************************************************/
STRING_EXTERN(HTTP_VERSION_STR, HTTP_VERSION);
STRING_EXTERN(HTTP_VERB_DELETE_STR, HTTP_VERB_DELETE);
STRING_EXTERN(HTTP_VERB_GET_STR, HTTP_VERB_GET);
STRING_EXTERN(HTTP_VERB_HEAD_STR, HTTP_VERB_HEAD);
STRING_EXTERN(HTTP_VERB_POST_STR, HTTP_VERB_POST);
STRING_EXTERN(HTTP_VERB_PUT_STR, HTTP_VERB_PUT);
STRING_EXTERN(HTTP_HEADER_AUTHORIZATION_STR, HTTP_HEADER_AUTHORIZATION);
STRING_EXTERN(HTTP_HEADER_CONTENT_LENGTH_STR, HTTP_HEADER_CONTENT_LENGTH);
STRING_EXTERN(HTTP_HEADER_CONTENT_MD5_STR, HTTP_HEADER_CONTENT_MD5);
STRING_EXTERN(HTTP_HEADER_ETAG_STR, HTTP_HEADER_ETAG);
STRING_EXTERN(HTTP_HEADER_HOST_STR, HTTP_HEADER_HOST);
STRING_EXTERN(HTTP_HEADER_LAST_MODIFIED_STR, HTTP_HEADER_LAST_MODIFIED);
// 5xx errors that should always be retried
#define HTTP_RESPONSE_CODE_RETRY_CLASS 5
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct HttpRequest
{
MemContext *memContext; // Mem context
HttpClient *client; // HTTP client
const String *verb; // HTTP verb (GET, POST, etc.)
const String *uri; // HTTP URI
const HttpQuery *query; // HTTP query
const HttpHeader *header; // HTTP headers
const Buffer *content; // HTTP content
HttpSession *session; // Session for async requests
};
OBJECT_DEFINE_MOVE(HTTP_REQUEST);
OBJECT_DEFINE_FREE(HTTP_REQUEST);
OBJECT_DEFINE_GET(Verb, const, HTTP_REQUEST, const String *, verb);
OBJECT_DEFINE_GET(Uri, const, HTTP_REQUEST, const String *, uri);
OBJECT_DEFINE_GET(Query, const, HTTP_REQUEST, const HttpQuery *, query);
OBJECT_DEFINE_GET(Header, const, HTTP_REQUEST, const HttpHeader *, header);
/***********************************************************************************************************************************
Process the request
***********************************************************************************************************************************/
static HttpResponse *
httpRequestProcess(HttpRequest *this, bool requestOnly, bool contentCache)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(BOOL, requestOnly);
FUNCTION_LOG_PARAM(BOOL, contentCache);
FUNCTION_LOG_END();
ASSERT(this != NULL);
// HTTP Response
HttpResponse *result = NULL;
MEM_CONTEXT_TEMP_BEGIN()
{
bool retry;
Wait *wait = waitNew(httpClientTimeout(this->client));
do
{
// Assume there will be no retry
retry = false;
TRY_BEGIN()
{
MEM_CONTEXT_TEMP_BEGIN()
{
HttpSession *session = NULL;
// If a session is saved then the request was already successfully sent
if (this->session != NULL)
{
session = httpSessionMove(this->session, memContextCurrent());
this->session = NULL;
}
// Else the request has not been sent yet or this is a retry
else
{
session = httpClientOpen(this->client);
// Write the request
String *queryStr = httpQueryRender(this->query);
ioWriteStrLine(
httpSessionIoWrite(session),
strNewFmt(
"%s %s%s%s " HTTP_VERSION "\r", strPtr(this->verb), strPtr(httpUriEncode(this->uri, true)),
queryStr == NULL ? "" : "?", queryStr == NULL ? "" : strPtr(queryStr)));
// Write headers
const StringList *headerList = httpHeaderList(this->header);
for (unsigned int headerIdx = 0; headerIdx < strLstSize(headerList); headerIdx++)
{
const String *headerKey = strLstGet(headerList, headerIdx);
ioWriteStrLine(
httpSessionIoWrite(session),
strNewFmt("%s:%s\r", strPtr(headerKey), strPtr(httpHeaderGet(this->header, headerKey))));
}
// Write out blank line to end the headers
ioWriteLine(httpSessionIoWrite(session), CR_BUF);
// Write out content if any
if (this->content != NULL)
ioWrite(httpSessionIoWrite(session), this->content);
// Flush all writes
ioWriteFlush(httpSessionIoWrite(session));
// If only performing the request then move the session to the object context
if (requestOnly)
this->session = httpSessionMove(session, this->memContext);
}
// Wait for response
if (!requestOnly)
{
result = httpResponseNew(session, this->verb, contentCache);
// Retry when response code is 5xx. These errors generally represent a server error for a request that
// looks valid. There are a few errors that might be permanently fatal but they are rare and it seems best
// not to try and pick and choose errors in this class to retry.
if (httpResponseCode(result) / 100 == HTTP_RESPONSE_CODE_RETRY_CLASS)
THROW_FMT(ServiceError, "[%u] %s", httpResponseCode(result), strPtr(httpResponseReason(result)));
// Move response to outer temp context
httpResponseMove(result, memContextPrior());
}
}
MEM_CONTEXT_TEMP_END();
}
CATCH_ANY()
{
// Retry if wait time has not expired
if (waitMore(wait))
{
LOG_DEBUG_FMT("retry %s: %s", errorTypeName(errorType()), errorMessage());
retry = true;
httpClientStat.retry++;
}
else
RETHROW();
}
TRY_END();
}
while (retry);
// Move response to calling context
httpResponseMove(result, memContextPrior());
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(HTTP_RESPONSE, result);
}
/**********************************************************************************************************************************/
HttpRequest *
httpRequestNew(HttpClient *client, const String *verb, const String *uri, HttpRequestNewParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_CLIENT, client);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, uri);
FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(HTTP_HEADER, param.header);
FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_END();
ASSERT(verb != NULL);
ASSERT(uri != NULL);
HttpRequest *this = NULL;
MEM_CONTEXT_NEW_BEGIN("HttpRequest")
{
this = memNew(sizeof(HttpRequest));
*this = (HttpRequest)
{
.memContext = MEM_CONTEXT_NEW(),
.client = client,
.verb = strDup(verb),
.uri = strDup(uri),
.query = httpQueryDup(param.query),
.header = param.header == NULL ? httpHeaderNew(NULL) : httpHeaderDup(param.header, NULL),
.content = param.content == NULL ? NULL : bufDup(param.content),
};
// Send the request
httpRequestProcess(this, true, false);
httpClientStat.request++;
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(HTTP_REQUEST, this);
}
/**********************************************************************************************************************************/
HttpResponse *
httpRequest(HttpRequest *this, bool contentCache)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(BOOL, contentCache);
FUNCTION_LOG_END();
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(HTTP_RESPONSE, httpRequestProcess(this, false, contentCache));
}
/**********************************************************************************************************************************/
void
httpRequestError(const HttpRequest *this, HttpResponse *response)
{
FUNCTION_LOG_BEGIN(logLevelTrace)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(HTTP_RESPONSE, response);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(response != NULL);
// Error code
String *error = strNewFmt("HTTP request failed with %u", httpResponseCode(response));
// Add reason when present
if (strSize(httpResponseReason(response)) > 0)
strCatFmt(error, " (%s)", strPtr(httpResponseReason(response)));
// Output uri/query
strCatZ(error, ":\n*** URI/Query ***:");
strCatFmt(error, "\n%s", strPtr(httpUriEncode(this->uri, true)));
if (this->query != NULL)
strCatFmt(error, "?%s", strPtr(httpQueryRender(this->query)));
// Output request headers
const StringList *requestHeaderList = httpHeaderList(this->header);
if (strLstSize(requestHeaderList) > 0)
{
strCatZ(error, "\n*** Request Headers ***:");
for (unsigned int requestHeaderIdx = 0; requestHeaderIdx < strLstSize(requestHeaderList); requestHeaderIdx++)
{
const String *key = strLstGet(requestHeaderList, requestHeaderIdx);
strCatFmt(
error, "\n%s: %s", strPtr(key),
httpHeaderRedact(this->header, key) ? "<redacted>" : strPtr(httpHeaderGet(this->header, key)));
}
}
// Output response headers
const HttpHeader *responseHeader = httpResponseHeader(response);
const StringList *responseHeaderList = httpHeaderList(responseHeader);
if (strLstSize(responseHeaderList) > 0)
{
strCatZ(error, "\n*** Response Headers ***:");
for (unsigned int responseHeaderIdx = 0; responseHeaderIdx < strLstSize(responseHeaderList); responseHeaderIdx++)
{
const String *key = strLstGet(responseHeaderList, responseHeaderIdx);
strCatFmt(error, "\n%s: %s", strPtr(key), strPtr(httpHeaderGet(responseHeader, key)));
}
}
// Add response content, if any
if (bufUsed(httpResponseContent(response)) > 0)
{
strCatZ(error, "\n*** Response Content ***:\n");
strCat(error, strNewBuf(httpResponseContent(response)));
}
THROW(ProtocolError, strPtr(error));
}
/**********************************************************************************************************************************/
String *
httpRequestToLog(const HttpRequest *this)
{
return strNewFmt(
"{verb: %s, uri: %s, query: %s, header: %s, contentSize: %zu",
strPtr(this->verb), strPtr(this->uri), this->query == NULL ? "null" : strPtr(httpQueryToLog(this->query)),
strPtr(httpHeaderToLog(this->header)), this->content == NULL ? 0 : bufUsed(this->content));
}