1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-12 10:04:14 +02:00

Asynchronous S3 multipart upload.

When uploading large files the upload is split into multiple parts which are assembled at the end to create the final file. Previously we waited until each part was acknowledged before starting on the processing (i.e. compression, etc.) of the next part.

Now, the request for each part is sent while processing continues and the response is read just before sending the request for the next part. This asynchronous method allows us to continue processing while the S3 server formulates a response.

Testing from outside AWS in a high-bandwidth, low-latency environment showed a 35% improvement in the upload time of 1GB files. The time spent waiting for multipart notifications was reduced by ~300% (this measurement included the final part which is not uploaded asynchronously).

There are still some possible improvements: 1) the creation of the multipart id could be made asynchronous when it looks like the upload will need to be multipart (this may incur cost if the upload turns out not to be multipart). 2) allow more than one async request (this will use more memory).

A fair amount of refactoring was required to make the HTTP responses asynchronous. This may seem like overkill but having well-defined request, response, and session objects will also be advantageous for the upcoming HTTP server functionality.

Another advantage is that the lifecycle of an HttpSession is better defined. We only want to reuse sessions that complete the request/response cycle successfully, otherwise we consider the session to be in a bad state and would prefer to start clean with a new one. Previously, this required complex notifications to mark a session as "successfully done". Now, ownership of the session is passed to the request and then the response and only returned to the client after a successful response. If an error occurs anywhere along the way the session will be automatically closed by the object destructor when the request/response object is freed (depending on which one currently owns the session).
This commit is contained in:
David Steele 2020-06-24 13:44:00 -04:00 committed by GitHub
parent 45d9b03136
commit c5892d1291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1491 additions and 1032 deletions

View File

@ -15,6 +15,10 @@
<release date="XXXX-XX-XX" version="2.28dev" title="UNDER DEVELOPMENT"> <release date="XXXX-XX-XX" version="2.28dev" title="UNDER DEVELOPMENT">
<release-core-list> <release-core-list>
<release-improvement-list> <release-improvement-list>
<release-item>
<p>Asynchronous S3 multipart upload.</p>
</release-item>
<release-item> <release-item>
<release-item-contributor-list> <release-item-contributor-list>
<release-item-reviewer id="cynthia.shang"/> <release-item-reviewer id="cynthia.shang"/>

View File

@ -73,11 +73,13 @@ SRCS = \
common/io/filter/size.c \ common/io/filter/size.c \
common/io/handleRead.c \ common/io/handleRead.c \
common/io/handleWrite.c \ common/io/handleWrite.c \
common/io/http/cache.c \
common/io/http/client.c \ common/io/http/client.c \
common/io/http/common.c \ common/io/http/common.c \
common/io/http/header.c \ common/io/http/header.c \
common/io/http/query.c \ common/io/http/query.c \
common/io/http/request.c \
common/io/http/response.c \
common/io/http/session.c \
common/io/io.c \ common/io/io.c \
common/io/read.c \ common/io/read.c \
common/io/socket/client.c \ common/io/socket/client.c \

View File

@ -19,10 +19,7 @@ cmdRepoCreate(void)
MEM_CONTEXT_TEMP_BEGIN() MEM_CONTEXT_TEMP_BEGIN()
{ {
if (strEq(storageType(storageRepo()), STORAGE_S3_TYPE_STR)) if (strEq(storageType(storageRepo()), STORAGE_S3_TYPE_STR))
{ storageS3RequestP((StorageS3 *)storageDriver(storageRepoWrite()), HTTP_VERB_PUT_STR, FSLASH_STR);
storageS3Request(
(StorageS3 *)storageDriver(storageRepoWrite()), HTTP_VERB_PUT_STR, FSLASH_STR, NULL, NULL, true, false);
}
} }
MEM_CONTEXT_TEMP_END(); MEM_CONTEXT_TEMP_END();

View File

@ -1,104 +0,0 @@
/***********************************************************************************************************************************
HTTP Client Cache
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/debug.h"
#include "common/io/http/cache.h"
#include "common/log.h"
#include "common/type/list.h"
#include "common/type/object.h"
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct HttpClientCache
{
MemContext *memContext; // Mem context
const String *host; // Client settings
unsigned int port;
TimeMSec timeout;
bool verifyPeer;
const String *caFile;
const String *caPath;
List *clientList; // List of HTTP clients
};
OBJECT_DEFINE_FREE(HTTP_CLIENT_CACHE);
/**********************************************************************************************************************************/
HttpClientCache *
httpClientCacheNew(
const String *host, unsigned int port, TimeMSec timeout, bool verifyPeer, const String *caFile, const String *caPath)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(STRING, host);
FUNCTION_LOG_PARAM(UINT, port);
FUNCTION_LOG_PARAM(TIME_MSEC, timeout);
FUNCTION_LOG_PARAM(BOOL, verifyPeer);
FUNCTION_LOG_PARAM(STRING, caFile);
FUNCTION_LOG_PARAM(STRING, caPath);
FUNCTION_LOG_END();
ASSERT(host != NULL);
HttpClientCache *this = NULL;
MEM_CONTEXT_NEW_BEGIN("HttpClientCache")
{
// Allocate state and set context
this = memNew(sizeof(HttpClientCache));
*this = (HttpClientCache)
{
.memContext = MEM_CONTEXT_NEW(),
.host = strDup(host),
.port = port,
.timeout = timeout,
.verifyPeer = verifyPeer,
.caFile = strDup(caFile),
.caPath = strDup(caPath),
.clientList = lstNew(sizeof(HttpClient *)),
};
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(HTTP_CLIENT_CACHE, this);
}
/**********************************************************************************************************************************/
HttpClient *
httpClientCacheGet(HttpClientCache *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace)
FUNCTION_LOG_PARAM(HTTP_CLIENT_CACHE, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
HttpClient *result = NULL;
// Search for a client that is not busy
for (unsigned int clientIdx = 0; clientIdx < lstSize(this->clientList); clientIdx++)
{
HttpClient *httpClient = *(HttpClient **)lstGet(this->clientList, clientIdx);
if (!httpClientBusy(httpClient))
result = httpClient;
}
// If none found then create a new one
if (result == NULL)
{
MEM_CONTEXT_BEGIN(this->memContext)
{
result = httpClientNew(this->host, this->port, this->timeout, this->verifyPeer, this->caFile, this->caPath);
lstAdd(this->clientList, &result);
}
MEM_CONTEXT_END();
}
FUNCTION_LOG_RETURN(HTTP_CLIENT, result);
}

View File

@ -5,49 +5,14 @@ HTTP Client
#include "common/debug.h" #include "common/debug.h"
#include "common/io/http/client.h" #include "common/io/http/client.h"
#include "common/io/http/common.h"
#include "common/io/io.h"
#include "common/io/read.intern.h"
#include "common/io/tls/client.h" #include "common/io/tls/client.h"
#include "common/log.h" #include "common/log.h"
#include "common/type/object.h" #include "common/type/object.h"
#include "common/wait.h"
/***********************************************************************************************************************************
Http constants
***********************************************************************************************************************************/
#define HTTP_VERSION "HTTP/1.1"
STRING_STATIC(HTTP_VERSION_STR, HTTP_VERSION);
STRING_EXTERN(HTTP_VERB_DELETE_STR, HTTP_VERB_DELETE);
STRING_EXTERN(HTTP_VERB_GET_STR, HTTP_VERB_GET);
STRING_EXTERN(HTTP_VERB_HEAD_STR, HTTP_VERB_HEAD);
STRING_EXTERN(HTTP_VERB_POST_STR, HTTP_VERB_POST);
STRING_EXTERN(HTTP_VERB_PUT_STR, HTTP_VERB_PUT);
STRING_EXTERN(HTTP_HEADER_AUTHORIZATION_STR, HTTP_HEADER_AUTHORIZATION);
#define HTTP_HEADER_CONNECTION "connection"
STRING_STATIC(HTTP_HEADER_CONNECTION_STR, HTTP_HEADER_CONNECTION);
STRING_EXTERN(HTTP_HEADER_CONTENT_LENGTH_STR, HTTP_HEADER_CONTENT_LENGTH);
STRING_EXTERN(HTTP_HEADER_CONTENT_MD5_STR, HTTP_HEADER_CONTENT_MD5);
STRING_EXTERN(HTTP_HEADER_ETAG_STR, HTTP_HEADER_ETAG);
STRING_EXTERN(HTTP_HEADER_HOST_STR, HTTP_HEADER_HOST);
STRING_EXTERN(HTTP_HEADER_LAST_MODIFIED_STR, HTTP_HEADER_LAST_MODIFIED);
#define HTTP_HEADER_TRANSFER_ENCODING "transfer-encoding"
STRING_STATIC(HTTP_HEADER_TRANSFER_ENCODING_STR, HTTP_HEADER_TRANSFER_ENCODING);
#define HTTP_VALUE_CONNECTION_CLOSE "close"
STRING_STATIC(HTTP_VALUE_CONNECTION_CLOSE_STR, HTTP_VALUE_CONNECTION_CLOSE);
#define HTTP_VALUE_TRANSFER_ENCODING_CHUNKED "chunked"
STRING_STATIC(HTTP_VALUE_TRANSFER_ENCODING_CHUNKED_STR, HTTP_VALUE_TRANSFER_ENCODING_CHUNKED);
// 5xx errors that should always be retried
#define HTTP_RESPONSE_CODE_RETRY_CLASS 5
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Statistics Statistics
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
static HttpClientStat httpClientStatLocal; HttpClientStat httpClientStat;
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Object type Object type
@ -56,138 +21,12 @@ struct HttpClient
{ {
MemContext *memContext; // Mem context MemContext *memContext; // Mem context
TimeMSec timeout; // Request timeout TimeMSec timeout; // Request timeout
TlsClient *tlsClient; // TLS client TlsClient *tlsClient; // TLS client
TlsSession *tlsSession; // Current TLS session
IoRead *ioRead; // Read io interface
unsigned int responseCode; // Response code (e.g. 200, 404) List *sessionReuseList; // List of HTTP sessions that can be reused
String *responseMessage; // Response message e.g. (OK, Not Found)
HttpHeader *responseHeader; // Response headers
bool contentChunked; // Is the response content chunked?
uint64_t contentSize; // Content size (ignored for chunked)
uint64_t contentRemaining; // Content remaining (per chunk if chunked)
bool closeOnContentEof; // Will server close after content is sent?
bool contentEof; // Has all content been read?
}; };
OBJECT_DEFINE_FREE(HTTP_CLIENT); OBJECT_DEFINE_GET(Timeout, const, HTTP_CLIENT, TimeMSec, timeout);
/***********************************************************************************************************************************
Read content
***********************************************************************************************************************************/
static size_t
httpClientRead(THIS_VOID, Buffer *buffer, bool block)
{
THIS(HttpClient);
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_CLIENT, this);
FUNCTION_LOG_PARAM(BUFFER, buffer);
FUNCTION_LOG_PARAM(BOOL, block);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(buffer != NULL);
ASSERT(!bufFull(buffer));
// Read if EOF has not been reached
size_t actualBytes = 0;
if (!this->contentEof)
{
// If close was requested and no content specified then the server may send content up until the eof
if (this->closeOnContentEof && !this->contentChunked && this->contentSize == 0)
{
ioRead(tlsSessionIoRead(this->tlsSession), buffer);
this->contentEof = ioReadEof(tlsSessionIoRead(this->tlsSession));
}
// Else read using specified encoding or size
else
{
do
{
// If chunked content and no content remaining
if (this->contentChunked && this->contentRemaining == 0)
{
// Read length of next chunk
MEM_CONTEXT_TEMP_BEGIN()
{
this->contentRemaining = cvtZToUInt64Base(
strPtr(strTrim(ioReadLine(tlsSessionIoRead(this->tlsSession)))), 16);
}
MEM_CONTEXT_TEMP_END();
// If content remaining is still zero then eof
if (this->contentRemaining == 0)
this->contentEof = true;
}
// Read if there is content remaining
if (this->contentRemaining > 0)
{
// If the buffer is larger than the content that needs to be read then limit the buffer size so the read won't
// block or read too far. Casting to size_t is safe on 32-bit because we know the max buffer size is defined as
// less than 2^32 so content remaining can't be more than that.
if (bufRemains(buffer) > this->contentRemaining)
bufLimitSet(buffer, bufSize(buffer) - (bufRemains(buffer) - (size_t)this->contentRemaining));
actualBytes = bufRemains(buffer);
this->contentRemaining -= ioRead(tlsSessionIoRead(this->tlsSession), buffer);
// Error if EOF but content read is not complete
if (ioReadEof(tlsSessionIoRead(this->tlsSession)))
THROW(FileReadError, "unexpected EOF reading HTTP content");
// Clear limit (this works even if the limit was not set and it is easier than checking)
bufLimitClear(buffer);
}
// If no content remaining
if (this->contentRemaining == 0)
{
// If chunked then consume the blank line that follows every chunk. There might be more chunk data so loop back
// around to check.
if (this->contentChunked)
{
ioReadLine(tlsSessionIoRead(this->tlsSession));
}
// If total content size was provided then this is eof
else
this->contentEof = true;
}
}
while (!bufFull(buffer) && !this->contentEof);
}
// If the server notified that it would close the connection after sending content then close the client side
if (this->contentEof && this->closeOnContentEof)
{
tlsSessionFree(this->tlsSession);
this->tlsSession = NULL;
}
}
FUNCTION_LOG_RETURN(SIZE, (size_t)actualBytes);
}
/***********************************************************************************************************************************
Has all content been read?
***********************************************************************************************************************************/
static bool
httpClientEof(THIS_VOID)
{
THIS(HttpClient);
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_CLIENT, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(BOOL, this->contentEof);
}
/**********************************************************************************************************************************/ /**********************************************************************************************************************************/
HttpClient * HttpClient *
@ -216,9 +55,10 @@ httpClientNew(
.memContext = MEM_CONTEXT_NEW(), .memContext = MEM_CONTEXT_NEW(),
.timeout = timeout, .timeout = timeout,
.tlsClient = tlsClientNew(sckClientNew(host, port, timeout), timeout, verifyPeer, caFile, caPath), .tlsClient = tlsClientNew(sckClientNew(host, port, timeout), timeout, verifyPeer, caFile, caPath),
.sessionReuseList = lstNew(sizeof(HttpSession *)),
}; };
httpClientStatLocal.object++; httpClientStat.object++;
} }
MEM_CONTEXT_NEW_END(); MEM_CONTEXT_NEW_END();
@ -226,273 +66,53 @@ httpClientNew(
} }
/**********************************************************************************************************************************/ /**********************************************************************************************************************************/
Buffer * HttpSession *
httpClientRequest( httpClientOpen(HttpClient *this)
HttpClient *this, const String *verb, const String *uri, const HttpQuery *query, const HttpHeader *requestHeader,
const Buffer *body, bool returnContent)
{ {
FUNCTION_LOG_BEGIN(logLevelDebug) FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_CLIENT, this); FUNCTION_LOG_PARAM(HTTP_CLIENT, this);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, uri);
FUNCTION_LOG_PARAM(HTTP_QUERY, query);
FUNCTION_LOG_PARAM(HTTP_HEADER, requestHeader);
FUNCTION_LOG_PARAM(BUFFER, body);
FUNCTION_LOG_PARAM(BOOL, returnContent);
FUNCTION_LOG_END(); FUNCTION_LOG_END();
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(verb != NULL);
ASSERT(uri != NULL);
// Buffer for returned content HttpSession *result = NULL;
Buffer *result = NULL;
MEM_CONTEXT_TEMP_BEGIN() // Check if there is a resuable session
if (lstSize(this->sessionReuseList) > 0)
{ {
bool retry; // Remove session from reusable list
Wait *wait = waitNew(this->timeout); result = *(HttpSession **)lstGet(this->sessionReuseList, 0);
lstRemoveIdx(this->sessionReuseList, 0);
do // Move session to the calling context
{ httpSessionMove(result, memContextCurrent());
// Assume there will be no retry }
retry = false; // Else create a new session
else
// Free the read interface {
httpClientDone(this); result = httpSessionNew(this, tlsClientOpen(this->tlsClient));
httpClientStat.session++;
// Free response status left over from the last request
httpHeaderFree(this->responseHeader);
this->responseHeader = NULL;
strFree(this->responseMessage);
this->responseMessage = NULL;
// Reset all content info
this->contentChunked = false;
this->contentSize = 0;
this->contentRemaining = 0;
this->closeOnContentEof = false;
this->contentEof = true;
TRY_BEGIN()
{
if (this->tlsSession == NULL)
{
MEM_CONTEXT_BEGIN(this->memContext)
{
this->tlsSession = tlsClientOpen(this->tlsClient);
httpClientStatLocal.session++;
}
MEM_CONTEXT_END();
}
// Write the request
String *queryStr = httpQueryRender(query);
ioWriteStrLine(
tlsSessionIoWrite(this->tlsSession),
strNewFmt(
"%s %s%s%s " HTTP_VERSION "\r", strPtr(verb), strPtr(httpUriEncode(uri, true)), queryStr == NULL ? "" : "?",
queryStr == NULL ? "" : strPtr(queryStr)));
// Write headers
if (requestHeader != NULL)
{
const StringList *headerList = httpHeaderList(requestHeader);
for (unsigned int headerIdx = 0; headerIdx < strLstSize(headerList); headerIdx++)
{
const String *headerKey = strLstGet(headerList, headerIdx);
ioWriteStrLine(
tlsSessionIoWrite(this->tlsSession),
strNewFmt("%s:%s\r", strPtr(headerKey), strPtr(httpHeaderGet(requestHeader, headerKey))));
}
}
// Write out blank line to end the headers
ioWriteLine(tlsSessionIoWrite(this->tlsSession), CR_BUF);
// Write out body if any
if (body != NULL)
ioWrite(tlsSessionIoWrite(this->tlsSession), body);
// Flush all writes
ioWriteFlush(tlsSessionIoWrite(this->tlsSession));
// Read status
String *status = ioReadLine(tlsSessionIoRead(this->tlsSession));
// Check status ends with a CR and remove it to make error formatting easier and more accurate
if (!strEndsWith(status, CR_STR))
THROW_FMT(FormatError, "HTTP response status '%s' should be CR-terminated", strPtr(status));
status = strSubN(status, 0, strSize(status) - 1);
// Check status is at least the minimum required length to avoid harder to interpret errors later on
if (strSize(status) < sizeof(HTTP_VERSION) + 4)
THROW_FMT(FormatError, "HTTP response '%s' has invalid length", strPtr(strTrim(status)));
// Check status starts with the correct http version
if (!strBeginsWith(status, HTTP_VERSION_STR))
THROW_FMT(FormatError, "HTTP version of response '%s' must be " HTTP_VERSION, strPtr(status));
// Read status code
status = strSub(status, sizeof(HTTP_VERSION));
int spacePos = strChr(status, ' ');
if (spacePos != 3)
THROW_FMT(FormatError, "response status '%s' must have a space after the status code", strPtr(status));
this->responseCode = cvtZToUInt(strPtr(strSubN(status, 0, (size_t)spacePos)));
// Read reason phrase. A missing reason phrase will be represented as an empty string.
MEM_CONTEXT_BEGIN(this->memContext)
{
this->responseMessage = strSub(status, (size_t)spacePos + 1);
}
MEM_CONTEXT_END();
// Read headers
MEM_CONTEXT_BEGIN(this->memContext)
{
this->responseHeader = httpHeaderNew(NULL);
}
MEM_CONTEXT_END();
do
{
// Read the next header
String *header = strTrim(ioReadLine(tlsSessionIoRead(this->tlsSession)));
// If the header is empty then we have reached the end of the headers
if (strSize(header) == 0)
break;
// Split the header and store it
int colonPos = strChr(header, ':');
if (colonPos < 0)
THROW_FMT(FormatError, "header '%s' missing colon", strPtr(strTrim(header)));
String *headerKey = strLower(strTrim(strSubN(header, 0, (size_t)colonPos)));
String *headerValue = strTrim(strSub(header, (size_t)colonPos + 1));
httpHeaderAdd(this->responseHeader, headerKey, headerValue);
// Read transfer encoding (only chunked is supported)
if (strEq(headerKey, HTTP_HEADER_TRANSFER_ENCODING_STR))
{
// Error if transfer encoding is not chunked
if (!strEq(headerValue, HTTP_VALUE_TRANSFER_ENCODING_CHUNKED_STR))
{
THROW_FMT(
FormatError, "only '%s' is supported for '%s' header", HTTP_VALUE_TRANSFER_ENCODING_CHUNKED,
HTTP_HEADER_TRANSFER_ENCODING);
}
this->contentChunked = true;
}
// Read content size
if (strEq(headerKey, HTTP_HEADER_CONTENT_LENGTH_STR))
{
this->contentSize = cvtZToUInt64(strPtr(headerValue));
this->contentRemaining = this->contentSize;
}
// If the server notified of a closed connection then close the client connection after reading content. This
// prevents doing a retry on the next request when using the closed connection.
if (strEq(headerKey, HTTP_HEADER_CONNECTION_STR) && strEq(headerValue, HTTP_VALUE_CONNECTION_CLOSE_STR))
{
this->closeOnContentEof = true;
httpClientStatLocal.close++;
}
}
while (1);
// Error if transfer encoding and content length are both set
if (this->contentChunked && this->contentSize > 0)
{
THROW_FMT(
FormatError, "'%s' and '%s' headers are both set", HTTP_HEADER_TRANSFER_ENCODING,
HTTP_HEADER_CONTENT_LENGTH);
}
// Was content returned in the response? HEAD will report content but not actually return any.
bool contentExists =
(this->contentChunked || this->contentSize > 0 || this->closeOnContentEof) && !strEq(verb, HTTP_VERB_HEAD_STR);
this->contentEof = !contentExists;
// If all content should be returned from this function then read the buffer. Also read the response if there has
// been an error.
if (returnContent || !httpClientResponseCodeOk(this))
{
if (contentExists)
{
result = bufNew(0);
do
{
bufResize(result, bufSize(result) + ioBufferSize());
httpClientRead(this, result, true);
}
while (!httpClientEof(this));
}
}
// Else create an io object, even if there is no content. This makes the logic for readers easier -- they can just
// check eof rather than also checking if the io object exists.
else
{
MEM_CONTEXT_BEGIN(this->memContext)
{
this->ioRead = ioReadNewP(this, .eof = httpClientEof, .read = httpClientRead);
ioReadOpen(this->ioRead);
}
MEM_CONTEXT_END();
}
// If the server notified that it would close the connection and there is no content then close the client side
if (this->closeOnContentEof && !contentExists)
{
tlsSessionFree(this->tlsSession);
this->tlsSession = NULL;
}
// Retry when response code is 5xx. These errors generally represent a server error for a request that looks valid.
// There are a few errors that might be permanently fatal but they are rare and it seems best not to try and pick
// and choose errors in this class to retry.
if (httpClientResponseCode(this) / 100 == HTTP_RESPONSE_CODE_RETRY_CLASS)
THROW_FMT(ServiceError, "[%u] %s", httpClientResponseCode(this), strPtr(httpClientResponseMessage(this)));
}
CATCH_ANY()
{
tlsSessionFree(this->tlsSession);
this->tlsSession = NULL;
// Retry if wait time has not expired
if (waitMore(wait))
{
LOG_DEBUG_FMT("retry %s: %s", errorTypeName(errorType()), errorMessage());
retry = true;
httpClientStatLocal.retry++;
}
else
RETHROW();
}
TRY_END();
}
while (retry);
// Move the result buffer (if any) to the parent context
bufMove(result, memContextPrior());
httpClientStatLocal.request++;
} }
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(BUFFER, result); FUNCTION_LOG_RETURN(HTTP_SESSION, result);
}
/**********************************************************************************************************************************/
void
httpClientReuse(HttpClient *this, HttpSession *session)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_CLIENT, this);
FUNCTION_LOG_PARAM(HTTP_SESSION, session);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(session != NULL);
httpSessionMove(session, lstMemContext(this->sessionReuseList));
lstAdd(this->sessionReuseList, &session);
FUNCTION_LOG_RETURN_VOID();
} }
/**********************************************************************************************************************************/ /**********************************************************************************************************************************/
@ -503,118 +123,13 @@ httpClientStatStr(void)
String *result = NULL; String *result = NULL;
if (httpClientStatLocal.object > 0) if (httpClientStat.object > 0)
{ {
result = strNewFmt( result = strNewFmt(
"http statistics: objects %" PRIu64 ", sessions %" PRIu64 ", requests %" PRIu64 ", retries %" PRIu64 "http statistics: objects %" PRIu64 ", sessions %" PRIu64 ", requests %" PRIu64 ", retries %" PRIu64
", closes %" PRIu64, ", closes %" PRIu64,
httpClientStatLocal.object, httpClientStatLocal.session, httpClientStatLocal.request, httpClientStatLocal.retry, httpClientStat.object, httpClientStat.session, httpClientStat.request, httpClientStat.retry, httpClientStat.close);
httpClientStatLocal.close);
} }
FUNCTION_TEST_RETURN(result); FUNCTION_TEST_RETURN(result);
} }
/**********************************************************************************************************************************/
void
httpClientDone(HttpClient *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_CLIENT, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
if (this->ioRead != NULL)
{
// If it looks like we were in the middle of a response then close the TLS session so we can start clean next time
if (!this->contentEof)
{
tlsSessionFree(this->tlsSession);
this->tlsSession = NULL;
}
ioReadFree(this->ioRead);
this->ioRead = NULL;
}
FUNCTION_LOG_RETURN_VOID();
}
/**********************************************************************************************************************************/
bool
httpClientBusy(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->ioRead);
}
/**********************************************************************************************************************************/
IoRead *
httpClientIoRead(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->ioRead);
}
/**********************************************************************************************************************************/
unsigned int
httpClientResponseCode(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->responseCode);
}
/**********************************************************************************************************************************/
bool
httpClientResponseCodeOk(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->responseCode / 100 == 2);
}
/**********************************************************************************************************************************/
const HttpHeader *
httpClientResponseHeader(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->responseHeader);
}
/**********************************************************************************************************************************/
const String *
httpClientResponseMessage(const HttpClient *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_CLIENT, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->responseMessage);
}

View File

@ -4,9 +4,14 @@ HTTP Client
A robust HTTP client with connection reuse and automatic retries. A robust HTTP client with connection reuse and automatic retries.
Using a single object to make multiple requests is more efficient because connections are reused whenever possible. Requests are Using a single object to make multiple requests is more efficient because connections are reused whenever possible. Requests are
automatically retried when the connection has been closed by the server. Any 5xx response is also retried. automatically retried when the connection has been closed by the server. Any 5xx response is also retried.
Only the HTTPS protocol is currently supported. Only the HTTPS protocol is currently supported.
IMPORTANT NOTE: HttpClient should have a longer lifetime than any active HttpSession objects. This does not apply to HttpSession
objects that are freed, i.e. if an error occurs it does not matter in what order HttpClient and HttpSession objects are destroyed,
or HttpSession objects that have been returned to the client with httpClientReuse(). The danger is when an active HttpResponse
completes and tries to call httpClientReuse() on an HttpClient that has been freed thus causing a segfault.
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
#ifndef COMMON_IO_HTTP_CLIENT_H #ifndef COMMON_IO_HTTP_CLIENT_H
#define COMMON_IO_HTTP_CLIENT_H #define COMMON_IO_HTTP_CLIENT_H
@ -19,41 +24,8 @@ Object type
typedef struct HttpClient HttpClient; typedef struct HttpClient HttpClient;
#include "common/io/http/header.h" #include "common/io/http/session.h"
#include "common/io/http/query.h"
#include "common/io/read.h"
#include "common/time.h" #include "common/time.h"
#include "common/type/stringList.h"
/***********************************************************************************************************************************
HTTP Constants
***********************************************************************************************************************************/
#define HTTP_VERB_DELETE "DELETE"
STRING_DECLARE(HTTP_VERB_DELETE_STR);
#define HTTP_VERB_GET "GET"
STRING_DECLARE(HTTP_VERB_GET_STR);
#define HTTP_VERB_HEAD "HEAD"
STRING_DECLARE(HTTP_VERB_HEAD_STR);
#define HTTP_VERB_POST "POST"
STRING_DECLARE(HTTP_VERB_POST_STR);
#define HTTP_VERB_PUT "PUT"
STRING_DECLARE(HTTP_VERB_PUT_STR);
#define HTTP_HEADER_AUTHORIZATION "authorization"
STRING_DECLARE(HTTP_HEADER_AUTHORIZATION_STR);
#define HTTP_HEADER_CONTENT_LENGTH "content-length"
STRING_DECLARE(HTTP_HEADER_CONTENT_LENGTH_STR);
#define HTTP_HEADER_CONTENT_MD5 "content-md5"
STRING_DECLARE(HTTP_HEADER_CONTENT_MD5_STR);
#define HTTP_HEADER_ETAG "etag"
STRING_DECLARE(HTTP_HEADER_ETAG_STR);
#define HTTP_HEADER_HOST "host"
STRING_DECLARE(HTTP_HEADER_HOST_STR);
#define HTTP_HEADER_LAST_MODIFIED "last-modified"
STRING_DECLARE(HTTP_HEADER_LAST_MODIFIED_STR);
#define HTTP_RESPONSE_CODE_FORBIDDEN 403
#define HTTP_RESPONSE_CODE_NOT_FOUND 404
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Statistics Statistics
@ -62,11 +34,13 @@ typedef struct HttpClientStat
{ {
uint64_t object; // Objects created uint64_t object; // Objects created
uint64_t session; // TLS sessions created uint64_t session; // TLS sessions created
uint64_t request; // Requests (i.e. calls to httpClientRequest()) uint64_t request; // Requests (i.e. calls to httpRequestNew())
uint64_t retry; // Request retries uint64_t retry; // Request retries
uint64_t close; // Closes forced by server uint64_t close; // Closes forced by server
} HttpClientStat; } HttpClientStat;
extern HttpClientStat httpClientStat;
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Constructors Constructors
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
@ -76,19 +50,11 @@ HttpClient *httpClientNew(
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Functions Functions
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
// Is the HTTP object busy? // Open a new session
bool httpClientBusy(const HttpClient *this); HttpSession *httpClientOpen(HttpClient *this);
// Mark the client as done if read is complete // Request/response finished cleanly so session can be reused
void httpClientDone(HttpClient *this); void httpClientReuse(HttpClient *this, HttpSession *session);
// Perform a request
Buffer *httpClientRequest(
HttpClient *this, const String *verb, const String *uri, const HttpQuery *query, const HttpHeader *requestHeader,
const Buffer *body, bool returnContent);
// Is this response code OK, i.e. 2XX?
bool httpClientResponseCodeOk(const HttpClient *this);
// Format statistics to a string // Format statistics to a string
String *httpClientStatStr(void); String *httpClientStatStr(void);
@ -96,22 +62,7 @@ String *httpClientStatStr(void);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Getters/Setters Getters/Setters
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
// Read interface TimeMSec httpClientTimeout(const HttpClient *this);
IoRead *httpClientIoRead(const HttpClient *this);
// Get the response code
unsigned int httpClientResponseCode(const HttpClient *this);
// Response headers
const HttpHeader *httpClientResponseHeader(const HttpClient *this);
// Response message
const String *httpClientResponseMessage(const HttpClient *this);
/***********************************************************************************************************************************
Destructor
***********************************************************************************************************************************/
void httpClientFree(HttpClient *this);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Macros for function logging Macros for function logging

View File

@ -46,6 +46,35 @@ httpQueryNew(void)
FUNCTION_TEST_RETURN(this); FUNCTION_TEST_RETURN(this);
} }
/**********************************************************************************************************************************/
HttpQuery *
httpQueryDup(const HttpQuery *query)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_QUERY, query);
FUNCTION_TEST_END();
HttpQuery *this = NULL;
if (query != NULL)
{
MEM_CONTEXT_NEW_BEGIN("HttpQuery")
{
// Allocate state and set context
this = memNew(sizeof(HttpQuery));
*this = (HttpQuery)
{
.memContext = MEM_CONTEXT_NEW(),
.kv = kvDup(query->kv),
};
}
MEM_CONTEXT_NEW_END();
}
FUNCTION_TEST_RETURN(this);
}
/**********************************************************************************************************************************/ /**********************************************************************************************************************************/
HttpQuery * HttpQuery *
httpQueryAdd(HttpQuery *this, const String *key, const String *value) httpQueryAdd(HttpQuery *this, const String *key, const String *value)

View File

@ -20,6 +20,7 @@ typedef struct HttpQuery HttpQuery;
Constructors Constructors
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
HttpQuery *httpQueryNew(void); HttpQuery *httpQueryNew(void);
HttpQuery *httpQueryDup(const HttpQuery *query);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Functions Functions

View File

@ -0,0 +1,313 @@
/***********************************************************************************************************************************
HTTP Request
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/debug.h"
#include "common/io/http/common.h"
#include "common/io/http/request.h"
#include "common/log.h"
#include "common/type/object.h"
#include "common/wait.h"
/***********************************************************************************************************************************
HTTP constants
***********************************************************************************************************************************/
STRING_EXTERN(HTTP_VERSION_STR, HTTP_VERSION);
STRING_EXTERN(HTTP_VERB_DELETE_STR, HTTP_VERB_DELETE);
STRING_EXTERN(HTTP_VERB_GET_STR, HTTP_VERB_GET);
STRING_EXTERN(HTTP_VERB_HEAD_STR, HTTP_VERB_HEAD);
STRING_EXTERN(HTTP_VERB_POST_STR, HTTP_VERB_POST);
STRING_EXTERN(HTTP_VERB_PUT_STR, HTTP_VERB_PUT);
STRING_EXTERN(HTTP_HEADER_AUTHORIZATION_STR, HTTP_HEADER_AUTHORIZATION);
STRING_EXTERN(HTTP_HEADER_CONTENT_LENGTH_STR, HTTP_HEADER_CONTENT_LENGTH);
STRING_EXTERN(HTTP_HEADER_CONTENT_MD5_STR, HTTP_HEADER_CONTENT_MD5);
STRING_EXTERN(HTTP_HEADER_ETAG_STR, HTTP_HEADER_ETAG);
STRING_EXTERN(HTTP_HEADER_HOST_STR, HTTP_HEADER_HOST);
STRING_EXTERN(HTTP_HEADER_LAST_MODIFIED_STR, HTTP_HEADER_LAST_MODIFIED);
// 5xx errors that should always be retried
#define HTTP_RESPONSE_CODE_RETRY_CLASS 5
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct HttpRequest
{
MemContext *memContext; // Mem context
HttpClient *client; // HTTP client
const String *verb; // HTTP verb (GET, POST, etc.)
const String *uri; // HTTP URI
const HttpQuery *query; // HTTP query
const HttpHeader *header; // HTTP headers
const Buffer *content; // HTTP content
HttpSession *session; // Session for async requests
};
OBJECT_DEFINE_MOVE(HTTP_REQUEST);
OBJECT_DEFINE_FREE(HTTP_REQUEST);
OBJECT_DEFINE_GET(Verb, const, HTTP_REQUEST, const String *, verb);
OBJECT_DEFINE_GET(Uri, const, HTTP_REQUEST, const String *, uri);
OBJECT_DEFINE_GET(Query, const, HTTP_REQUEST, const HttpQuery *, query);
OBJECT_DEFINE_GET(Header, const, HTTP_REQUEST, const HttpHeader *, header);
/***********************************************************************************************************************************
Process the request
***********************************************************************************************************************************/
static HttpResponse *
httpRequestProcess(HttpRequest *this, bool requestOnly, bool contentCache)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(BOOL, requestOnly);
FUNCTION_LOG_PARAM(BOOL, contentCache);
FUNCTION_LOG_END();
ASSERT(this != NULL);
// HTTP Response
HttpResponse *result = NULL;
MEM_CONTEXT_TEMP_BEGIN()
{
bool retry;
Wait *wait = waitNew(httpClientTimeout(this->client));
do
{
// Assume there will be no retry
retry = false;
TRY_BEGIN()
{
MEM_CONTEXT_TEMP_BEGIN()
{
HttpSession *session = NULL;
// If a session is saved then the request was already successfully sent
if (this->session != NULL)
{
session = httpSessionMove(this->session, memContextCurrent());
this->session = NULL;
}
// Else the request has not been sent yet or this is a retry
else
{
session = httpClientOpen(this->client);
// Write the request
String *queryStr = httpQueryRender(this->query);
ioWriteStrLine(
httpSessionIoWrite(session),
strNewFmt(
"%s %s%s%s " HTTP_VERSION "\r", strPtr(this->verb), strPtr(httpUriEncode(this->uri, true)),
queryStr == NULL ? "" : "?", queryStr == NULL ? "" : strPtr(queryStr)));
// Write headers
const StringList *headerList = httpHeaderList(this->header);
for (unsigned int headerIdx = 0; headerIdx < strLstSize(headerList); headerIdx++)
{
const String *headerKey = strLstGet(headerList, headerIdx);
ioWriteStrLine(
httpSessionIoWrite(session),
strNewFmt("%s:%s\r", strPtr(headerKey), strPtr(httpHeaderGet(this->header, headerKey))));
}
// Write out blank line to end the headers
ioWriteLine(httpSessionIoWrite(session), CR_BUF);
// Write out content if any
if (this->content != NULL)
ioWrite(httpSessionIoWrite(session), this->content);
// Flush all writes
ioWriteFlush(httpSessionIoWrite(session));
// If only performing the request then move the session to the object context
if (requestOnly)
this->session = httpSessionMove(session, this->memContext);
}
// Wait for response
if (!requestOnly)
{
result = httpResponseNew(session, this->verb, contentCache);
// Retry when response code is 5xx. These errors generally represent a server error for a request that
// looks valid. There are a few errors that might be permanently fatal but they are rare and it seems best
// not to try and pick and choose errors in this class to retry.
if (httpResponseCode(result) / 100 == HTTP_RESPONSE_CODE_RETRY_CLASS)
THROW_FMT(ServiceError, "[%u] %s", httpResponseCode(result), strPtr(httpResponseReason(result)));
// Move response to outer temp context
httpResponseMove(result, memContextPrior());
}
}
MEM_CONTEXT_TEMP_END();
}
CATCH_ANY()
{
// Retry if wait time has not expired
if (waitMore(wait))
{
LOG_DEBUG_FMT("retry %s: %s", errorTypeName(errorType()), errorMessage());
retry = true;
httpClientStat.retry++;
}
else
RETHROW();
}
TRY_END();
}
while (retry);
// Move response to calling context
httpResponseMove(result, memContextPrior());
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(HTTP_RESPONSE, result);
}
/**********************************************************************************************************************************/
HttpRequest *
httpRequestNew(HttpClient *client, const String *verb, const String *uri, HttpRequestNewParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_CLIENT, client);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, uri);
FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(HTTP_HEADER, param.header);
FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_END();
ASSERT(verb != NULL);
ASSERT(uri != NULL);
HttpRequest *this = NULL;
MEM_CONTEXT_NEW_BEGIN("HttpRequest")
{
this = memNew(sizeof(HttpRequest));
*this = (HttpRequest)
{
.memContext = MEM_CONTEXT_NEW(),
.client = client,
.verb = strDup(verb),
.uri = strDup(uri),
.query = httpQueryDup(param.query),
.header = param.header == NULL ? httpHeaderNew(NULL) : httpHeaderDup(param.header, NULL),
.content = param.content == NULL ? NULL : bufDup(param.content),
};
// Send the request
httpRequestProcess(this, true, false);
httpClientStat.request++;
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(HTTP_REQUEST, this);
}
/**********************************************************************************************************************************/
HttpResponse *
httpRequest(HttpRequest *this, bool contentCache)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(BOOL, contentCache);
FUNCTION_LOG_END();
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(HTTP_RESPONSE, httpRequestProcess(this, false, contentCache));
}
/**********************************************************************************************************************************/
void
httpRequestError(const HttpRequest *this, HttpResponse *response)
{
FUNCTION_LOG_BEGIN(logLevelTrace)
FUNCTION_LOG_PARAM(HTTP_REQUEST, this);
FUNCTION_LOG_PARAM(HTTP_RESPONSE, response);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(response != NULL);
// Error code
String *error = strNewFmt("HTTP request failed with %u", httpResponseCode(response));
// Add reason when present
if (strSize(httpResponseReason(response)) > 0)
strCatFmt(error, " (%s)", strPtr(httpResponseReason(response)));
// Output uri/query
strCatZ(error, ":\n*** URI/Query ***:");
strCatFmt(error, "\n%s", strPtr(httpUriEncode(this->uri, true)));
if (this->query != NULL)
strCatFmt(error, "?%s", strPtr(httpQueryRender(this->query)));
// Output request headers
const StringList *requestHeaderList = httpHeaderList(this->header);
if (strLstSize(requestHeaderList) > 0)
{
strCatZ(error, "\n*** Request Headers ***:");
for (unsigned int requestHeaderIdx = 0; requestHeaderIdx < strLstSize(requestHeaderList); requestHeaderIdx++)
{
const String *key = strLstGet(requestHeaderList, requestHeaderIdx);
strCatFmt(
error, "\n%s: %s", strPtr(key),
httpHeaderRedact(this->header, key) ? "<redacted>" : strPtr(httpHeaderGet(this->header, key)));
}
}
// Output response headers
const HttpHeader *responseHeader = httpResponseHeader(response);
const StringList *responseHeaderList = httpHeaderList(responseHeader);
if (strLstSize(responseHeaderList) > 0)
{
strCatZ(error, "\n*** Response Headers ***:");
for (unsigned int responseHeaderIdx = 0; responseHeaderIdx < strLstSize(responseHeaderList); responseHeaderIdx++)
{
const String *key = strLstGet(responseHeaderList, responseHeaderIdx);
strCatFmt(error, "\n%s: %s", strPtr(key), strPtr(httpHeaderGet(responseHeader, key)));
}
}
// Add response content, if any
if (bufUsed(httpResponseContent(response)) > 0)
{
strCatZ(error, "\n*** Response Content ***:\n");
strCat(error, strNewBuf(httpResponseContent(response)));
}
THROW(ProtocolError, strPtr(error));
}
/**********************************************************************************************************************************/
String *
httpRequestToLog(const HttpRequest *this)
{
return strNewFmt(
"{verb: %s, uri: %s, query: %s, header: %s, contentSize: %zu",
strPtr(this->verb), strPtr(this->uri), this->query == NULL ? "null" : strPtr(httpQueryToLog(this->query)),
strPtr(httpHeaderToLog(this->header)), this->content == NULL ? 0 : bufUsed(this->content));
}

View File

@ -0,0 +1,111 @@
/***********************************************************************************************************************************
HTTP Request
Send a request to an HTTP server and get a response. The interface is natively asynchronous, i.e. httpRequestNew() sends a request
and httpRequest() waits for a response. These can be called together for synchronous behavior or separately for asynchronous
behavior.
***********************************************************************************************************************************/
#ifndef COMMON_IO_HTTP_REQUEST_H
#define COMMON_IO_HTTP_REQUEST_H
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
#define HTTP_REQUEST_TYPE HttpRequest
#define HTTP_REQUEST_PREFIX httpRequest
typedef struct HttpRequest HttpRequest;
#include "common/io/http/header.h"
#include "common/io/http/query.h"
#include "common/io/http/response.h"
/***********************************************************************************************************************************
HTTP Constants
***********************************************************************************************************************************/
#define HTTP_VERSION "HTTP/1.1"
STRING_DECLARE(HTTP_VERSION_STR);
#define HTTP_VERB_DELETE "DELETE"
STRING_DECLARE(HTTP_VERB_DELETE_STR);
#define HTTP_VERB_GET "GET"
STRING_DECLARE(HTTP_VERB_GET_STR);
#define HTTP_VERB_HEAD "HEAD"
STRING_DECLARE(HTTP_VERB_HEAD_STR);
#define HTTP_VERB_POST "POST"
STRING_DECLARE(HTTP_VERB_POST_STR);
#define HTTP_VERB_PUT "PUT"
STRING_DECLARE(HTTP_VERB_PUT_STR);
#define HTTP_HEADER_AUTHORIZATION "authorization"
STRING_DECLARE(HTTP_HEADER_AUTHORIZATION_STR);
#define HTTP_HEADER_CONTENT_LENGTH "content-length"
STRING_DECLARE(HTTP_HEADER_CONTENT_LENGTH_STR);
#define HTTP_HEADER_CONTENT_MD5 "content-md5"
STRING_DECLARE(HTTP_HEADER_CONTENT_MD5_STR);
#define HTTP_HEADER_ETAG "etag"
STRING_DECLARE(HTTP_HEADER_ETAG_STR);
#define HTTP_HEADER_HOST "host"
STRING_DECLARE(HTTP_HEADER_HOST_STR);
#define HTTP_HEADER_LAST_MODIFIED "last-modified"
STRING_DECLARE(HTTP_HEADER_LAST_MODIFIED_STR);
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
typedef struct HttpRequestNewParam
{
VAR_PARAM_HEADER;
const HttpQuery *query;
const HttpHeader *header;
const Buffer *content;
} HttpRequestNewParam;
#define httpRequestNewP(client, verb, uri, ...) \
httpRequestNew(client, verb, uri, (HttpRequestNewParam){VAR_PARAM_INIT, __VA_ARGS__})
HttpRequest *httpRequestNew(HttpClient *client, const String *verb, const String *uri, HttpRequestNewParam param);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
// Send a request to the server
HttpResponse *httpRequest(HttpRequest *this, bool contentCache);
// Throw an error if the request failed
void httpRequestError(const HttpRequest *this, HttpResponse *response) __attribute__((__noreturn__));
// Move to a new parent mem context
HttpRequest *httpRequestMove(HttpRequest *this, MemContext *parentNew);
/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Request verb
const String *httpRequestVerb(const HttpRequest *this);
// Request URI
const String *httpRequestUri(const HttpRequest *this);
// Request query
const HttpQuery *httpRequestQuery(const HttpRequest *this);
// Request headers
const HttpHeader *httpRequestHeader(const HttpRequest *this);
/***********************************************************************************************************************************
Destructor
***********************************************************************************************************************************/
void httpRequestFree(HttpRequest *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
String *httpRequestToLog(const HttpRequest *this);
#define FUNCTION_LOG_HTTP_REQUEST_TYPE \
HttpRequest *
#define FUNCTION_LOG_HTTP_REQUEST_FORMAT(value, buffer, bufferSize) \
FUNCTION_LOG_STRING_OBJECT_FORMAT(value, httpRequestToLog, buffer, bufferSize)
#endif

View File

@ -0,0 +1,415 @@
/***********************************************************************************************************************************
HTTP Response
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/debug.h"
#include "common/io/http/client.h"
#include "common/io/http/common.h"
#include "common/io/http/request.h"
#include "common/io/http/response.h"
#include "common/io/io.h"
#include "common/io/read.intern.h"
#include "common/io/tls/client.h"
#include "common/log.h"
#include "common/type/object.h"
#include "common/wait.h"
/***********************************************************************************************************************************
HTTP constants
***********************************************************************************************************************************/
#define HTTP_HEADER_CONNECTION "connection"
STRING_STATIC(HTTP_HEADER_CONNECTION_STR, HTTP_HEADER_CONNECTION);
#define HTTP_HEADER_TRANSFER_ENCODING "transfer-encoding"
STRING_STATIC(HTTP_HEADER_TRANSFER_ENCODING_STR, HTTP_HEADER_TRANSFER_ENCODING);
#define HTTP_VALUE_CONNECTION_CLOSE "close"
STRING_STATIC(HTTP_VALUE_CONNECTION_CLOSE_STR, HTTP_VALUE_CONNECTION_CLOSE);
#define HTTP_VALUE_TRANSFER_ENCODING_CHUNKED "chunked"
STRING_STATIC(HTTP_VALUE_TRANSFER_ENCODING_CHUNKED_STR, HTTP_VALUE_TRANSFER_ENCODING_CHUNKED);
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct HttpResponse
{
MemContext *memContext; // Mem context
HttpSession *session; // HTTP session
IoRead *contentRead; // Read interface for response content
unsigned int code; // Response code (e.g. 200, 404)
String *reason; // Response reason e.g. (OK, Not Found)
HttpHeader *header; // Response headers
bool contentChunked; // Is the response content chunked?
uint64_t contentSize; // Content size (ignored for chunked)
uint64_t contentRemaining; // Content remaining (per chunk if chunked)
bool closeOnContentEof; // Will server close after content is sent?
bool contentExists; // Does content exist?
bool contentEof; // Has all content been read?
Buffer *content; // Caches content once requested
};
OBJECT_DEFINE_MOVE(HTTP_RESPONSE);
OBJECT_DEFINE_FREE(HTTP_RESPONSE);
OBJECT_DEFINE_GET(IoRead, , HTTP_RESPONSE, IoRead *, contentRead);
OBJECT_DEFINE_GET(Code, const, HTTP_RESPONSE, unsigned int, code);
OBJECT_DEFINE_GET(Header, const, HTTP_RESPONSE, const HttpHeader *, header);
OBJECT_DEFINE_GET(Reason, const, HTTP_RESPONSE, const String *, reason);
/***********************************************************************************************************************************
When response is done close/reuse the connection
***********************************************************************************************************************************/
static void
httpResponseDone(HttpResponse *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_RESPONSE, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->session != NULL);
// If close was requested by the server then free the session
if (this->closeOnContentEof)
{
httpSessionFree(this->session);
// Only update the close stats after a successful response so it is not counted if there was an error/retry
httpClientStat.close++;
}
// Else return it to the client so it can be reused
else
httpSessionDone(this->session);
this->session = NULL;
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Read content
***********************************************************************************************************************************/
static size_t
httpResponseRead(THIS_VOID, Buffer *buffer, bool block)
{
THIS(HttpResponse);
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_RESPONSE, this);
FUNCTION_LOG_PARAM(BUFFER, buffer);
FUNCTION_LOG_PARAM(BOOL, block);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(buffer != NULL);
ASSERT(!bufFull(buffer));
ASSERT(this->contentEof || this->session != NULL);
// Read if EOF has not been reached
size_t actualBytes = 0;
if (!this->contentEof)
{
MEM_CONTEXT_TEMP_BEGIN()
{
IoRead *rawRead = httpSessionIoRead(this->session);
// If close was requested and no content specified then the server may send content up until the eof
if (this->closeOnContentEof && !this->contentChunked && this->contentSize == 0)
{
ioRead(rawRead, buffer);
this->contentEof = ioReadEof(rawRead);
}
// Else read using specified encoding or size
else
{
do
{
// If chunked content and no content remaining
if (this->contentChunked && this->contentRemaining == 0)
{
// Read length of next chunk
this->contentRemaining = cvtZToUInt64Base(strPtr(strTrim(ioReadLine(rawRead))), 16);
// If content remaining is still zero then eof
if (this->contentRemaining == 0)
this->contentEof = true;
}
// Read if there is content remaining
if (this->contentRemaining > 0)
{
// If the buffer is larger than the content that needs to be read then limit the buffer size so the read
// won't block or read too far. Casting to size_t is safe on 32-bit because we know the max buffer size is
// defined as less than 2^32 so content remaining can't be more than that.
if (bufRemains(buffer) > this->contentRemaining)
bufLimitSet(buffer, bufSize(buffer) - (bufRemains(buffer) - (size_t)this->contentRemaining));
actualBytes = bufRemains(buffer);
this->contentRemaining -= ioRead(rawRead, buffer);
// Error if EOF but content read is not complete
if (ioReadEof(rawRead))
THROW(FileReadError, "unexpected EOF reading HTTP content");
// Clear limit (this works even if the limit was not set and it is easier than checking)
bufLimitClear(buffer);
}
// If no content remaining
if (this->contentRemaining == 0)
{
// If chunked then consume the blank line that follows every chunk. There might be more chunk data so loop back
// around to check.
if (this->contentChunked)
{
ioReadLine(rawRead);
}
// If total content size was provided then this is eof
else
this->contentEof = true;
}
}
while (!bufFull(buffer) && !this->contentEof);
}
// If all content has been read
if (this->contentEof)
httpResponseDone(this);
}
MEM_CONTEXT_TEMP_END();
}
FUNCTION_LOG_RETURN(SIZE, (size_t)actualBytes);
}
/***********************************************************************************************************************************
Has all content been read?
***********************************************************************************************************************************/
static bool
httpResponseEof(THIS_VOID)
{
THIS(HttpResponse);
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(HTTP_RESPONSE, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(BOOL, this->contentEof);
}
/**********************************************************************************************************************************/
HttpResponse *
httpResponseNew(HttpSession *session, const String *verb, bool contentCache)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_SESSION, session);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(BOOL, contentCache);
FUNCTION_LOG_END();
ASSERT(session != NULL);
ASSERT(verb != NULL);
HttpResponse *this = NULL;
MEM_CONTEXT_NEW_BEGIN("HttpResponse")
{
this = memNew(sizeof(HttpResponse));
*this = (HttpResponse)
{
.memContext = MEM_CONTEXT_NEW(),
.session = httpSessionMove(session, memContextCurrent()),
.header = httpHeaderNew(NULL),
};
MEM_CONTEXT_TEMP_BEGIN()
{
// Read status
String *status = ioReadLine(httpSessionIoRead(this->session));
// Check status ends with a CR and remove it to make error formatting easier and more accurate
if (!strEndsWith(status, CR_STR))
THROW_FMT(FormatError, "HTTP response status '%s' should be CR-terminated", strPtr(status));
status = strSubN(status, 0, strSize(status) - 1);
// Check status is at least the minimum required length to avoid harder to interpret errors later on
if (strSize(status) < sizeof(HTTP_VERSION) + 4)
THROW_FMT(FormatError, "HTTP response '%s' has invalid length", strPtr(strTrim(status)));
// Check status starts with the correct http version
if (!strBeginsWith(status, HTTP_VERSION_STR))
THROW_FMT(FormatError, "HTTP version of response '%s' must be " HTTP_VERSION, strPtr(status));
// Read status code
status = strSub(status, sizeof(HTTP_VERSION));
int spacePos = strChr(status, ' ');
if (spacePos != 3)
THROW_FMT(FormatError, "response status '%s' must have a space after the status code", strPtr(status));
this->code = cvtZToUInt(strPtr(strSubN(status, 0, (size_t)spacePos)));
// Read reason phrase. A missing reason phrase will be represented as an empty string.
MEM_CONTEXT_BEGIN(this->memContext)
{
this->reason = strSub(status, (size_t)spacePos + 1);
}
MEM_CONTEXT_END();
// Read headers
do
{
// Read the next header
String *header = strTrim(ioReadLine(httpSessionIoRead(this->session)));
// If the header is empty then we have reached the end of the headers
if (strSize(header) == 0)
break;
// Split the header and store it
int colonPos = strChr(header, ':');
if (colonPos < 0)
THROW_FMT(FormatError, "header '%s' missing colon", strPtr(strTrim(header)));
String *headerKey = strLower(strTrim(strSubN(header, 0, (size_t)colonPos)));
String *headerValue = strTrim(strSub(header, (size_t)colonPos + 1));
httpHeaderAdd(this->header, headerKey, headerValue);
// Read transfer encoding (only chunked is supported)
if (strEq(headerKey, HTTP_HEADER_TRANSFER_ENCODING_STR))
{
// Error if transfer encoding is not chunked
if (!strEq(headerValue, HTTP_VALUE_TRANSFER_ENCODING_CHUNKED_STR))
{
THROW_FMT(
FormatError, "only '%s' is supported for '%s' header", HTTP_VALUE_TRANSFER_ENCODING_CHUNKED,
HTTP_HEADER_TRANSFER_ENCODING);
}
this->contentChunked = true;
}
// Read content size
if (strEq(headerKey, HTTP_HEADER_CONTENT_LENGTH_STR))
{
this->contentSize = cvtZToUInt64(strPtr(headerValue));
this->contentRemaining = this->contentSize;
}
// If the server notified of a closed connection then close the client connection after reading content. This
// prevents doing a retry on the next request when using the closed connection.
if (strEq(headerKey, HTTP_HEADER_CONNECTION_STR) && strEq(headerValue, HTTP_VALUE_CONNECTION_CLOSE_STR))
this->closeOnContentEof = true;
}
while (1);
// Error if transfer encoding and content length are both set
if (this->contentChunked && this->contentSize > 0)
{
THROW_FMT(
FormatError, "'%s' and '%s' headers are both set", HTTP_HEADER_TRANSFER_ENCODING,
HTTP_HEADER_CONTENT_LENGTH);
}
// Was content returned in the response? HEAD will report content but not actually return any.
this->contentExists =
(this->contentChunked || this->contentSize > 0 || this->closeOnContentEof) && !strEq(verb, HTTP_VERB_HEAD_STR);
this->contentEof = !this->contentExists;
// Create an io object, even if there is no content. This makes the logic for readers easier -- they can just check eof
// rather than also checking if the io object exists.
MEM_CONTEXT_BEGIN(this->memContext)
{
this->contentRead = ioReadNewP(this, .eof = httpResponseEof, .read = httpResponseRead);
ioReadOpen(this->contentRead);
}
MEM_CONTEXT_END();
// If there is no content then we are done with the client
if (!this->contentExists)
{
httpResponseDone(this);
}
// Else cache content when requested or on error
else if (contentCache || !httpResponseCodeOk(this))
{
MEM_CONTEXT_BEGIN(this->memContext)
{
httpResponseContent(this);
}
MEM_CONTEXT_END();
}
}
MEM_CONTEXT_TEMP_END();
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(HTTP_RESPONSE, this);
}
/**********************************************************************************************************************************/
const Buffer *
httpResponseContent(HttpResponse *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_RESPONSE, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
if (this->content == NULL)
{
this->content = bufNew(0);
if (this->contentExists)
{
do
{
bufResize(this->content, bufSize(this->content) + ioBufferSize());
httpResponseRead(this, this->content, true);
}
while (!httpResponseEof(this));
bufResize(this->content, bufUsed(this->content));
}
}
FUNCTION_TEST_RETURN(this->content);
}
/**********************************************************************************************************************************/
bool
httpResponseCodeOk(const HttpResponse *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_RESPONSE, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->code / 100 == 2);
}
/**********************************************************************************************************************************/
String *
httpResponseToLog(const HttpResponse *this)
{
return strNewFmt(
"{code: %u, reason: %s, header: %s, contentChunked: %s, contentSize: %" PRIu64 ", contentRemaining: %" PRIu64
", closeOnContentEof: %s, contentExists: %s, contentEof: %s, contentCached: %s",
this->code, strPtr(this->reason), strPtr(httpHeaderToLog(this->header)),
cvtBoolToConstZ(this->contentChunked), this->contentSize, this->contentRemaining, cvtBoolToConstZ(this->closeOnContentEof),
cvtBoolToConstZ(this->contentExists), cvtBoolToConstZ(this->contentEof), cvtBoolToConstZ(this->content != NULL));
}

View File

@ -0,0 +1,79 @@
/***********************************************************************************************************************************
HTTP Response
Response created after a successful request. Once the content is read the underlying connection may be recycled but the headers,
cached content, etc. will still be available for the lifetime of the object.
***********************************************************************************************************************************/
#ifndef COMMON_IO_HTTP_RESPONSE_H
#define COMMON_IO_HTTP_RESPONSE_H
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
#define HTTP_RESPONSE_TYPE HttpResponse
#define HTTP_RESPONSE_PREFIX httpResponse
typedef struct HttpResponse HttpResponse;
#include "common/io/http/header.h"
#include "common/io/http/session.h"
#include "common/io/read.h"
/***********************************************************************************************************************************
HTTP Response Constants
***********************************************************************************************************************************/
#define HTTP_RESPONSE_CODE_FORBIDDEN 403
#define HTTP_RESPONSE_CODE_NOT_FOUND 404
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
HttpResponse *httpResponseNew(HttpSession *session, const String *verb, bool contentCache);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
// Is this response code OK, i.e. 2XX?
bool httpResponseCodeOk(const HttpResponse *this);
// Fetch all response content. Content will be cached so it can be retrieved again without additional cost.
const Buffer *httpResponseContent(HttpResponse *this);
// Move to a new parent mem context
HttpResponse *httpResponseMove(HttpResponse *this, MemContext *parentNew);
/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Is the response still being read?
bool httpResponseBusy(const HttpResponse *this);
// Read interface used to get the response content. This is intended for reading content that may be very large and will not be held
// in memory all at once. If the content must be loaded completely for processing (e.g. XML) then httpResponseContent() is simpler.
IoRead *httpResponseIoRead(HttpResponse *this);
// Response code
unsigned int httpResponseCode(const HttpResponse *this);
// Response headers
const HttpHeader *httpResponseHeader(const HttpResponse *this);
// Response reason
const String *httpResponseReason(const HttpResponse *this);
/***********************************************************************************************************************************
Destructor
***********************************************************************************************************************************/
void httpResponseFree(HttpResponse *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
String *httpResponseToLog(const HttpResponse *this);
#define FUNCTION_LOG_HTTP_RESPONSE_TYPE \
HttpResponse *
#define FUNCTION_LOG_HTTP_RESPONSE_FORMAT(value, buffer, bufferSize) \
FUNCTION_LOG_STRING_OBJECT_FORMAT(value, httpResponseToLog, buffer, bufferSize)
#endif

View File

@ -0,0 +1,95 @@
/***********************************************************************************************************************************
HTTP Session
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/debug.h"
#include "common/io/http/session.h"
#include "common/io/io.h"
#include "common/log.h"
#include "common/memContext.h"
#include "common/type/object.h"
/***********************************************************************************************************************************
Object type
***********************************************************************************************************************************/
struct HttpSession
{
MemContext *memContext; // Mem context
HttpClient *httpClient; // HTTP client
TlsSession *tlsSession; // TLS session
};
OBJECT_DEFINE_MOVE(HTTP_SESSION);
OBJECT_DEFINE_FREE(HTTP_SESSION);
/**********************************************************************************************************************************/
HttpSession *
httpSessionNew(HttpClient *httpClient, TlsSession *tlsSession)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_CLIENT, httpClient);
FUNCTION_LOG_PARAM(TLS_SESSION, tlsSession);
FUNCTION_LOG_END();
ASSERT(httpClient != NULL);
ASSERT(tlsSession != NULL);
HttpSession *this = NULL;
MEM_CONTEXT_NEW_BEGIN("HttpSession")
{
this = memNew(sizeof(HttpSession));
*this = (HttpSession)
{
.memContext = MEM_CONTEXT_NEW(),
.httpClient = httpClient,
.tlsSession = tlsSessionMove(tlsSession, memContextCurrent()),
};
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(HTTP_SESSION, this);
}
/**********************************************************************************************************************************/
void
httpSessionDone(HttpSession *this)
{
FUNCTION_LOG_BEGIN(logLevelDebug)
FUNCTION_LOG_PARAM(HTTP_SESSION, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
httpClientReuse(this->httpClient, this);
FUNCTION_LOG_RETURN_VOID();
}
/**********************************************************************************************************************************/
IoRead *
httpSessionIoRead(HttpSession *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_SESSION, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(tlsSessionIoRead(this->tlsSession));
}
/**********************************************************************************************************************************/
IoWrite *
httpSessionIoWrite(HttpSession *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_SESSION, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(tlsSessionIoWrite(this->tlsSession));
}

View File

@ -1,44 +1,58 @@
/*********************************************************************************************************************************** /***********************************************************************************************************************************
HTTP Client Cache HTTP Session
Cache HTTP clients and return one that is not busy on request. HTTP sessions are created by calling httpClientOpen(), which is currently done exclusively by the HttpRequest object.
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
#ifndef COMMON_IO_HTTP_CLIENT_CACHE_H #ifndef COMMON_IO_HTTP_SESSION_H
#define COMMON_IO_HTTP_CLIENT_CACHE_H #define COMMON_IO_HTTP_SESSION_H
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Object type Object type
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
#define HTTP_CLIENT_CACHE_TYPE HttpClientCache #define HTTP_SESSION_TYPE HttpSession
#define HTTP_CLIENT_CACHE_PREFIX httpClientCache #define HTTP_SESSION_PREFIX httpSession
typedef struct HttpClientCache HttpClientCache; typedef struct HttpSession HttpSession;
#include "common/io/read.h"
#include "common/io/http/client.h" #include "common/io/http/client.h"
#include "common/io/tls/session.h"
#include "common/io/write.h"
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Constructors Constructors
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
HttpClientCache *httpClientCacheNew( HttpSession *httpSessionNew(HttpClient *client, TlsSession *session);
const String *host, unsigned int port, TimeMSec timeout, bool verifyPeer, const String *caFile, const String *caPath);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Functions Functions
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
// Get an HTTP client from the cache // Move to a new parent mem context
HttpClient *httpClientCacheGet(HttpClientCache *this); HttpSession *httpSessionMove(HttpSession *this, MemContext *parentNew);
// Work with the session has finished cleanly and it can be reused
void httpSessionDone(HttpSession *this);
/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Read interface
IoRead *httpSessionIoRead(HttpSession *this);
// Write interface
IoWrite *httpSessionIoWrite(HttpSession *this);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Destructor Destructor
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
void httpClientCacheFree(HttpClientCache *this); void httpSessionFree(HttpSession *this);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Macros for function logging Macros for function logging
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
#define FUNCTION_LOG_HTTP_CLIENT_CACHE_TYPE \ #define FUNCTION_LOG_HTTP_SESSION_TYPE \
HttpClientCache * HttpSession *
#define FUNCTION_LOG_HTTP_CLIENT_CACHE_FORMAT(value, buffer, bufferSize) \ #define FUNCTION_LOG_HTTP_SESSION_FORMAT(value, buffer, bufferSize) \
objToLog(value, "HttpClientCache", buffer, bufferSize) objToLog(value, "HttpSession", buffer, bufferSize)
#endif #endif

View File

@ -27,7 +27,7 @@ typedef struct StorageReadS3
StorageReadInterface interface; // Interface StorageReadInterface interface; // Interface
StorageS3 *storage; // Storage that created this object StorageS3 *storage; // Storage that created this object
HttpClient *httpClient; // HTTP client for requests HttpResponse *httpResponse; // HTTP response
} StorageReadS3; } StorageReadS3;
/*********************************************************************************************************************************** /***********************************************************************************************************************************
@ -38,15 +38,6 @@ Macros for function logging
#define FUNCTION_LOG_STORAGE_READ_S3_FORMAT(value, buffer, bufferSize) \ #define FUNCTION_LOG_STORAGE_READ_S3_FORMAT(value, buffer, bufferSize) \
objToLog(value, "StorageReadS3", buffer, bufferSize) objToLog(value, "StorageReadS3", buffer, bufferSize)
/***********************************************************************************************************************************
Mark HTTP client as done so it can be reused
***********************************************************************************************************************************/
OBJECT_DEFINE_FREE_RESOURCE_BEGIN(STORAGE_READ_S3, LOG, logLevelTrace)
{
httpClientDone(this->httpClient);
}
OBJECT_DEFINE_FREE_RESOURCE_END(LOG);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Open the file Open the file
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
@ -60,16 +51,20 @@ storageReadS3Open(THIS_VOID)
FUNCTION_LOG_END(); FUNCTION_LOG_END();
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(this->httpClient == NULL); ASSERT(this->httpResponse == NULL);
bool result = false; bool result = false;
// Request the file // Request the file
this->httpClient = storageS3Request(this->storage, HTTP_VERB_GET_STR, this->interface.name, NULL, NULL, false, true).httpClient; MEM_CONTEXT_BEGIN(this->memContext)
{
if (httpClientResponseCodeOk(this->httpClient)) this->httpResponse = storageS3RequestP(
this->storage, HTTP_VERB_GET_STR, this->interface.name, .allowMissing = true, .contentIo = true);
}
MEM_CONTEXT_END();
if (httpResponseCodeOk(this->httpResponse))
{ {
memContextCallbackSet(this->memContext, storageReadS3FreeResource, this);
result = true; result = true;
} }
// Else error unless ignore missing // Else error unless ignore missing
@ -93,33 +88,11 @@ storageReadS3(THIS_VOID, Buffer *buffer, bool block)
FUNCTION_LOG_PARAM(BOOL, block); FUNCTION_LOG_PARAM(BOOL, block);
FUNCTION_LOG_END(); FUNCTION_LOG_END();
ASSERT(this != NULL && this->httpClient != NULL); ASSERT(this != NULL && this->httpResponse != NULL);
ASSERT(httpClientIoRead(this->httpClient) != NULL); ASSERT(httpResponseIoRead(this->httpResponse) != NULL);
ASSERT(buffer != NULL && !bufFull(buffer)); ASSERT(buffer != NULL && !bufFull(buffer));
FUNCTION_LOG_RETURN(SIZE, ioRead(httpClientIoRead(this->httpClient), buffer)); FUNCTION_LOG_RETURN(SIZE, ioRead(httpResponseIoRead(this->httpResponse), buffer));
}
/***********************************************************************************************************************************
Close the file
***********************************************************************************************************************************/
static void
storageReadS3Close(THIS_VOID)
{
THIS(StorageReadS3);
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_READ_S3, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->httpClient != NULL);
memContextCallbackClear(this->memContext);
storageReadS3FreeResource(this);
this->httpClient = NULL;
FUNCTION_LOG_RETURN_VOID();
} }
/*********************************************************************************************************************************** /***********************************************************************************************************************************
@ -134,10 +107,10 @@ storageReadS3Eof(THIS_VOID)
FUNCTION_TEST_PARAM(STORAGE_READ_S3, this); FUNCTION_TEST_PARAM(STORAGE_READ_S3, this);
FUNCTION_TEST_END(); FUNCTION_TEST_END();
ASSERT(this != NULL && this->httpClient != NULL); ASSERT(this != NULL && this->httpResponse != NULL);
ASSERT(httpClientIoRead(this->httpClient) != NULL); ASSERT(httpResponseIoRead(this->httpResponse) != NULL);
FUNCTION_TEST_RETURN(ioReadEof(httpClientIoRead(this->httpClient))); FUNCTION_TEST_RETURN(ioReadEof(httpResponseIoRead(this->httpResponse)));
} }
/**********************************************************************************************************************************/ /**********************************************************************************************************************************/
@ -172,7 +145,6 @@ storageReadS3New(StorageS3 *storage, const String *name, bool ignoreMissing)
.ioInterface = (IoReadInterface) .ioInterface = (IoReadInterface)
{ {
.close = storageReadS3Close,
.eof = storageReadS3Eof, .eof = storageReadS3Eof,
.open = storageReadS3Open, .open = storageReadS3Open,
.read = storageReadS3, .read = storageReadS3,

View File

@ -8,7 +8,7 @@ S3 Storage
#include "common/crypto/hash.h" #include "common/crypto/hash.h"
#include "common/encode.h" #include "common/encode.h"
#include "common/debug.h" #include "common/debug.h"
#include "common/io/http/cache.h" #include "common/io/http/client.h"
#include "common/io/http/common.h" #include "common/io/http/common.h"
#include "common/log.h" #include "common/log.h"
#include "common/memContext.h" #include "common/memContext.h"
@ -86,7 +86,7 @@ struct StorageS3
{ {
STORAGE_COMMON_MEMBER; STORAGE_COMMON_MEMBER;
MemContext *memContext; MemContext *memContext;
HttpClientCache *httpClientCache; // HTTP client cache to service requests HttpClient *httpClient; // HTTP client to service requests
StringList *headerRedactList; // List of headers to redact from logging StringList *headerRedactList; // List of headers to redact from logging
const String *bucket; // Bucket to store data in const String *bucket; // Bucket to store data in
@ -234,77 +234,99 @@ storageS3Auth(
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Process S3 request Process S3 request
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
StorageS3RequestResult HttpRequest *
storageS3Request( storageS3RequestAsync(StorageS3 *this, const String *verb, const String *uri, StorageS3RequestAsyncParam param)
StorageS3 *this, const String *verb, const String *uri, const HttpQuery *query, const Buffer *body, bool returnContent,
bool allowMissing)
{ {
FUNCTION_LOG_BEGIN(logLevelDebug); FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(STORAGE_S3, this); FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, verb); FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, uri); FUNCTION_LOG_PARAM(STRING, uri);
FUNCTION_LOG_PARAM(HTTP_QUERY, query); FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(BUFFER, body); FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_PARAM(BOOL, returnContent);
FUNCTION_LOG_PARAM(BOOL, allowMissing);
FUNCTION_LOG_END(); FUNCTION_LOG_END();
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(verb != NULL); ASSERT(verb != NULL);
ASSERT(uri != NULL); ASSERT(uri != NULL);
StorageS3RequestResult result = {0}; HttpRequest *result = NULL;
MEM_CONTEXT_TEMP_BEGIN()
{
HttpHeader *requestHeader = httpHeaderNew(this->headerRedactList);
// Set content length
httpHeaderAdd(
requestHeader, HTTP_HEADER_CONTENT_LENGTH_STR,
param.content == NULL || bufUsed(param.content) == 0 ? ZERO_STR : strNewFmt("%zu", bufUsed(param.content)));
// Calculate content-md5 header if there is content
if (param.content != NULL)
{
char md5Hash[HASH_TYPE_MD5_SIZE_HEX];
encodeToStr(encodeBase64, bufPtr(cryptoHashOne(HASH_TYPE_MD5_STR, param.content)), HASH_TYPE_M5_SIZE, md5Hash);
httpHeaderAdd(requestHeader, HTTP_HEADER_CONTENT_MD5_STR, STR(md5Hash));
}
// When using path-style URIs the bucket name needs to be prepended
if (this->uriStyle == storageS3UriStylePath)
uri = strNewFmt("/%s%s", strPtr(this->bucket), strPtr(uri));
// Generate authorization header
storageS3Auth(
this, verb, httpUriEncode(uri, true), param.query, storageS3DateTime(time(NULL)), requestHeader,
param.content == NULL || bufUsed(param.content) == 0 ?
HASH_TYPE_SHA256_ZERO_STR : bufHex(cryptoHashOne(HASH_TYPE_SHA256_STR, param.content)));
// Send request
MEM_CONTEXT_PRIOR_BEGIN()
{
result = httpRequestNewP(
this->httpClient, verb, uri, .query = param.query, .header = requestHeader, .content = param.content);
}
MEM_CONTEXT_END();
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(HTTP_REQUEST, result);
}
HttpResponse *
storageS3Response(HttpRequest *request, StorageS3ResponseParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(HTTP_REQUEST, request);
FUNCTION_LOG_PARAM(BOOL, param.allowMissing);
FUNCTION_LOG_PARAM(BOOL, param.contentIo);
FUNCTION_LOG_END();
ASSERT(request != NULL);
HttpResponse *result = NULL;
unsigned int retryRemaining = 2; unsigned int retryRemaining = 2;
bool done; bool done;
// When using path-style URIs the bucket name needs to be prepended
if (this->uriStyle == storageS3UriStylePath)
uri = strNewFmt("/%s%s", strPtr(this->bucket), strPtr(uri));
do do
{ {
done = true; done = true;
MEM_CONTEXT_TEMP_BEGIN() MEM_CONTEXT_TEMP_BEGIN()
{ {
// Create header list and add content length
HttpHeader *requestHeader = httpHeaderNew(this->headerRedactList);
// Set content length
httpHeaderAdd(
requestHeader, HTTP_HEADER_CONTENT_LENGTH_STR,
body == NULL || bufUsed(body) == 0 ? ZERO_STR : strNewFmt("%zu", bufUsed(body)));
// Calculate content-md5 header if there is content
if (body != NULL)
{
char md5Hash[HASH_TYPE_MD5_SIZE_HEX];
encodeToStr(encodeBase64, bufPtr(cryptoHashOne(HASH_TYPE_MD5_STR, body)), HASH_TYPE_M5_SIZE, md5Hash);
httpHeaderAdd(requestHeader, HTTP_HEADER_CONTENT_MD5_STR, STR(md5Hash));
}
// Generate authorization header
storageS3Auth(
this, verb, httpUriEncode(uri, true), query, storageS3DateTime(time(NULL)), requestHeader,
body == NULL || bufUsed(body) == 0 ? HASH_TYPE_SHA256_ZERO_STR : bufHex(cryptoHashOne(HASH_TYPE_SHA256_STR, body)));
// Get an HTTP client
HttpClient *httpClient = httpClientCacheGet(this->httpClientCache);
// Process request // Process request
Buffer *response = httpClientRequest(httpClient, verb, uri, query, requestHeader, body, returnContent); result = httpRequest(request, !param.contentIo);
// Error if the request was not successful // Error if the request was not successful
if (!httpClientResponseCodeOk(httpClient) && if (!httpResponseCodeOk(result) && (!param.allowMissing || httpResponseCode(result) != HTTP_RESPONSE_CODE_NOT_FOUND))
(!allowMissing || httpClientResponseCode(httpClient) != HTTP_RESPONSE_CODE_NOT_FOUND))
{ {
// If there are retries remaining and a response parse it as XML to extract the S3 error code // If there are retries remaining and a response parse it as XML to extract the S3 error code
if (response != NULL && retryRemaining > 0) const Buffer *content = httpResponseContent(result);
if (bufUsed(content) > 0 && retryRemaining > 0)
{ {
// Attempt to parse the XML and extract the S3 error code // Attempt to parse the XML and extract the S3 error code
TRY_BEGIN() TRY_BEGIN()
{ {
XmlNode *error = xmlDocumentRoot(xmlDocumentNewBuf(response)); XmlNode *error = xmlDocumentRoot(xmlDocumentNewBuf(content));
const String *errorCode = xmlNodeContent(xmlNodeChild(error, S3_XML_TAG_CODE_STR, true)); const String *errorCode = xmlNodeContent(xmlNodeChild(error, S3_XML_TAG_CODE_STR, true));
if (strEq(errorCode, S3_ERROR_REQUEST_TIME_TOO_SKEWED_STR)) if (strEq(errorCode, S3_ERROR_REQUEST_TIME_TOO_SKEWED_STR))
@ -324,75 +346,38 @@ storageS3Request(
TRY_END(); TRY_END();
} }
// If not done then retry instead of reporting the error // If done throw the error
if (done) if (done)
{ httpRequestError(request, result);
// General error message
String *error = strNewFmt(
"S3 request failed with %u: %s", httpClientResponseCode(httpClient),
strPtr(httpClientResponseMessage(httpClient)));
// Output uri/query
strCatZ(error, "\n*** URI/Query ***:");
strCatFmt(error, "\n%s", strPtr(httpUriEncode(uri, true)));
if (query != NULL)
strCatFmt(error, "?%s", strPtr(httpQueryRender(query)));
// Output request headers
const StringList *requestHeaderList = httpHeaderList(requestHeader);
strCatZ(error, "\n*** Request Headers ***:");
for (unsigned int requestHeaderIdx = 0; requestHeaderIdx < strLstSize(requestHeaderList); requestHeaderIdx++)
{
const String *key = strLstGet(requestHeaderList, requestHeaderIdx);
strCatFmt(
error, "\n%s: %s", strPtr(key),
httpHeaderRedact(requestHeader, key) || strEq(key, S3_HEADER_DATE_STR) ?
"<redacted>" : strPtr(httpHeaderGet(requestHeader, key)));
}
// Output response headers
const HttpHeader *responseHeader = httpClientResponseHeader(httpClient);
const StringList *responseHeaderList = httpHeaderList(responseHeader);
if (strLstSize(responseHeaderList) > 0)
{
strCatZ(error, "\n*** Response Headers ***:");
for (unsigned int responseHeaderIdx = 0; responseHeaderIdx < strLstSize(responseHeaderList);
responseHeaderIdx++)
{
const String *key = strLstGet(responseHeaderList, responseHeaderIdx);
strCatFmt(error, "\n%s: %s", strPtr(key), strPtr(httpHeaderGet(responseHeader, key)));
}
}
// If there was content then output it
if (response!= NULL)
strCatFmt(error, "\n*** Response Content ***:\n%s", strPtr(strNewBuf(response)));
THROW(ProtocolError, strPtr(error));
}
} }
else else
{ httpResponseMove(result, memContextPrior());
// On success move the buffer to the prior context
result.httpClient = httpClient;
result.responseHeader = httpHeaderMove(
httpHeaderDup(httpClientResponseHeader(httpClient), NULL), memContextPrior());
result.response = bufMove(response, memContextPrior());
}
} }
MEM_CONTEXT_TEMP_END(); MEM_CONTEXT_TEMP_END();
} }
while (!done); while (!done);
FUNCTION_LOG_RETURN(STORAGE_S3_REQUEST_RESULT, result); FUNCTION_LOG_RETURN(HTTP_RESPONSE, result);
}
HttpResponse *
storageS3Request(StorageS3 *this, const String *verb, const String *uri, StorageS3RequestParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, uri);
FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_PARAM(BOOL, param.allowMissing);
FUNCTION_LOG_PARAM(BOOL, param.contentIo);
FUNCTION_LOG_END();
FUNCTION_LOG_RETURN(
HTTP_RESPONSE,
storageS3ResponseP(
storageS3RequestAsyncP(this, verb, uri, .query = param.query, .content = param.content),
.allowMissing = param.allowMissing, .contentIo = param.contentIo));
} }
/*********************************************************************************************************************************** /***********************************************************************************************************************************
@ -469,8 +454,7 @@ storageS3ListInternal(
httpQueryAdd(query, S3_QUERY_PREFIX_STR, queryPrefix); httpQueryAdd(query, S3_QUERY_PREFIX_STR, queryPrefix);
XmlNode *xmlRoot = xmlDocumentRoot( XmlNode *xmlRoot = xmlDocumentRoot(
xmlDocumentNewBuf( xmlDocumentNewBuf(httpResponseContent(storageS3RequestP(this, HTTP_VERB_GET_STR, FSLASH_STR, query))));
storageS3Request(this, HTTP_VERB_GET_STR, FSLASH_STR, query, NULL, true, false).response));
// Get subpath list // Get subpath list
XmlNodeList *subPathList = xmlNodeChildList(xmlRoot, S3_XML_TAG_COMMON_PREFIXES_STR); XmlNodeList *subPathList = xmlNodeChildList(xmlRoot, S3_XML_TAG_COMMON_PREFIXES_STR);
@ -539,17 +523,19 @@ storageS3Info(THIS_VOID, const String *file, StorageInfoLevel level, StorageInte
ASSERT(file != NULL); ASSERT(file != NULL);
// Attempt to get file info // Attempt to get file info
StorageS3RequestResult httpResult = storageS3Request(this, HTTP_VERB_HEAD_STR, file, NULL, NULL, true, true); HttpResponse *httpResponse = storageS3RequestP(this, HTTP_VERB_HEAD_STR, file, .allowMissing = true);
// Does the file exist? // Does the file exist?
StorageInfo result = {.level = level, .exists = httpClientResponseCodeOk(httpResult.httpClient)}; StorageInfo result = {.level = level, .exists = httpResponseCodeOk(httpResponse)};
// Add basic level info if requested and the file exists // Add basic level info if requested and the file exists
if (result.level >= storageInfoLevelBasic && result.exists) if (result.level >= storageInfoLevelBasic && result.exists)
{ {
const HttpHeader *httpHeader = httpResponseHeader(httpResponse);
result.type = storageTypeFile; result.type = storageTypeFile;
result.size = cvtZToUInt64(strPtr(httpHeaderGet(httpResult.responseHeader, HTTP_HEADER_CONTENT_LENGTH_STR))); result.size = cvtZToUInt64(strPtr(httpHeaderGet(httpHeader, HTTP_HEADER_CONTENT_LENGTH_STR)));
result.timeModified = httpDateToTime(httpHeaderGet(httpResult.responseHeader, HTTP_HEADER_LAST_MODIFIED_STR)); result.timeModified = httpDateToTime(httpHeaderGet(httpHeader, HTTP_HEADER_LAST_MODIFIED_STR));
} }
FUNCTION_LOG_RETURN(STORAGE_INFO, result); FUNCTION_LOG_RETURN(STORAGE_INFO, result);
@ -708,12 +694,13 @@ storageS3PathRemoveInternal(StorageS3 *this, XmlDocument *request)
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(request != NULL); ASSERT(request != NULL);
Buffer *response = storageS3Request( const Buffer *response = httpResponseContent(
this, HTTP_VERB_POST_STR, FSLASH_STR, httpQueryAdd(httpQueryNew(), S3_QUERY_DELETE_STR, EMPTY_STR), storageS3RequestP(
xmlDocumentBuf(request), true, false).response; this, HTTP_VERB_POST_STR, FSLASH_STR, .query = httpQueryAdd(httpQueryNew(), S3_QUERY_DELETE_STR, EMPTY_STR),
.content = xmlDocumentBuf(request)));
// Nothing is returned when there are no errors // Nothing is returned when there are no errors
if (response != NULL) if (bufSize(response) > 0)
{ {
XmlNodeList *errorList = xmlNodeChildList(xmlDocumentRoot(xmlDocumentNewBuf(response)), S3_XML_TAG_ERROR_STR); XmlNodeList *errorList = xmlNodeChildList(xmlDocumentRoot(xmlDocumentNewBuf(response)), S3_XML_TAG_ERROR_STR);
@ -828,7 +815,7 @@ storageS3Remove(THIS_VOID, const String *file, StorageInterfaceRemoveParam param
ASSERT(file != NULL); ASSERT(file != NULL);
ASSERT(!param.errorOnMissing); ASSERT(!param.errorOnMissing);
storageS3Request(this, HTTP_VERB_DELETE_STR, file, NULL, NULL, true, false); storageS3RequestP(this, HTTP_VERB_DELETE_STR, file);
FUNCTION_LOG_RETURN_VOID(); FUNCTION_LOG_RETURN_VOID();
} }
@ -904,13 +891,14 @@ storageS3New(
.signingKeyDate = YYYYMMDD_STR, .signingKeyDate = YYYYMMDD_STR,
}; };
// Create the HTTP client cache used to service requests // Create the HTTP client used to service requests
driver->httpClientCache = httpClientCacheNew( driver->httpClient = httpClientNew(
host == NULL ? driver->bucketEndpoint : host, driver->port, timeout, verifyPeer, caFile, caPath); host == NULL ? driver->bucketEndpoint : host, driver->port, timeout, verifyPeer, caFile, caPath);
// Create list of redacted headers // Create list of redacted headers
driver->headerRedactList = strLstNew(); driver->headerRedactList = strLstNew();
strLstAdd(driver->headerRedactList, HTTP_HEADER_AUTHORIZATION_STR); strLstAdd(driver->headerRedactList, HTTP_HEADER_AUTHORIZATION_STR);
strLstAdd(driver->headerRedactList, S3_HEADER_DATE_STR);
this = storageNew( this = storageNew(
STORAGE_S3_TYPE_STR, path, 0, 0, write, pathExpressionFunction, driver, driver->interface); STORAGE_S3_TYPE_STR, path, 0, 0, write, pathExpressionFunction, driver, driver->interface);

View File

@ -9,27 +9,52 @@ Object type
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
typedef struct StorageS3 StorageS3; typedef struct StorageS3 StorageS3;
#include "common/io/http/client.h" #include "common/io/http/request.h"
#include "storage/s3/storage.h" #include "storage/s3/storage.h"
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Perform an S3 Request Functions
***********************************************************************************************************************************/ ***********************************************************************************************************************************/
#define FUNCTION_LOG_STORAGE_S3_REQUEST_RESULT_TYPE \ // Perform async request
StorageS3RequestResult typedef struct StorageS3RequestAsyncParam
#define FUNCTION_LOG_STORAGE_S3_REQUEST_RESULT_FORMAT(value, buffer, bufferSize) \
objToLog(&value, "StorageS3RequestResult", buffer, bufferSize)
typedef struct StorageS3RequestResult
{ {
HttpClient *httpClient; VAR_PARAM_HEADER;
HttpHeader *responseHeader; const HttpQuery *query;
Buffer *response; const Buffer *content;
} StorageS3RequestResult; } StorageS3RequestAsyncParam;
StorageS3RequestResult storageS3Request( #define storageS3RequestAsyncP(this, verb, uri, ...) \
StorageS3 *this, const String *verb, const String *uri, const HttpQuery *query, const Buffer *body, bool returnContent, storageS3RequestAsync(this, verb, uri, (StorageS3RequestAsyncParam){VAR_PARAM_INIT, __VA_ARGS__})
bool allowMissing);
HttpRequest *storageS3RequestAsync(StorageS3 *this, const String *verb, const String *uri, StorageS3RequestAsyncParam param);
// Get async response
typedef struct StorageS3ResponseParam
{
VAR_PARAM_HEADER;
bool allowMissing;
bool contentIo;
} StorageS3ResponseParam;
#define storageS3ResponseP(request, ...) \
storageS3Response(request, (StorageS3ResponseParam){VAR_PARAM_INIT, __VA_ARGS__})
HttpResponse *storageS3Response(HttpRequest *request, StorageS3ResponseParam param);
// Perform sync request
typedef struct StorageS3RequestParam
{
VAR_PARAM_HEADER;
const HttpQuery *query;
const Buffer *content;
bool allowMissing;
bool contentIo;
} StorageS3RequestParam;
#define storageS3RequestP(this, verb, uri, ...) \
storageS3Request(this, verb, uri, (StorageS3RequestParam){VAR_PARAM_INIT, __VA_ARGS__})
HttpResponse *storageS3Request(StorageS3 *this, const String *verb, const String *uri, StorageS3RequestParam param);
/*********************************************************************************************************************************** /***********************************************************************************************************************************
Macros for function logging Macros for function logging

View File

@ -37,6 +37,7 @@ typedef struct StorageWriteS3
StorageWriteInterface interface; // Interface StorageWriteInterface interface; // Interface
StorageS3 *storage; // Storage that created this object StorageS3 *storage; // Storage that created this object
HttpRequest *request; // Async request
size_t partSize; size_t partSize;
Buffer *partBuffer; Buffer *partBuffer;
const String *uploadId; const String *uploadId;
@ -86,21 +87,48 @@ storageWriteS3Part(StorageWriteS3 *this)
FUNCTION_LOG_PARAM(STORAGE_WRITE_S3, this); FUNCTION_LOG_PARAM(STORAGE_WRITE_S3, this);
FUNCTION_LOG_END(); FUNCTION_LOG_END();
ASSERT(this != NULL);
// If there is an outstanding async request then wait for the response and store the part id
if (this->request != NULL)
{
strLstAdd(
this->uploadPartList, httpHeaderGet(httpResponseHeader(storageS3ResponseP(this->request)), HTTP_HEADER_ETAG_STR));
ASSERT(strLstGet(this->uploadPartList, strLstSize(this->uploadPartList) - 1) != NULL);
httpRequestFree(this->request);
this->request = NULL;
}
FUNCTION_LOG_RETURN_VOID();
}
static void
storageWriteS3PartAsync(StorageWriteS3 *this)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_WRITE_S3, this);
FUNCTION_LOG_END();
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(this->partBuffer != NULL); ASSERT(this->partBuffer != NULL);
ASSERT(bufSize(this->partBuffer) > 0); ASSERT(bufSize(this->partBuffer) > 0);
MEM_CONTEXT_TEMP_BEGIN() MEM_CONTEXT_TEMP_BEGIN()
{ {
// Complete prior async request, if any
storageWriteS3Part(this);
// Get the upload id if we have not already // Get the upload id if we have not already
if (this->uploadId == NULL) if (this->uploadId == NULL)
{ {
// Initiate mult-part upload // Initiate mult-part upload
XmlNode *xmlRoot = xmlDocumentRoot( XmlNode *xmlRoot = xmlDocumentRoot(
xmlDocumentNewBuf( xmlDocumentNewBuf(
storageS3Request( httpResponseContent(
this->storage, HTTP_VERB_POST_STR, this->interface.name, storageS3RequestP(
httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOADS_STR, EMPTY_STR), NULL, true, false).response)); this->storage, HTTP_VERB_POST_STR, this->interface.name,
.query = httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOADS_STR, EMPTY_STR)))));
// Store the upload id // Store the upload id
MEM_CONTEXT_BEGIN(this->memContext) MEM_CONTEXT_BEGIN(this->memContext)
@ -111,19 +139,17 @@ storageWriteS3Part(StorageWriteS3 *this)
MEM_CONTEXT_END(); MEM_CONTEXT_END();
} }
// Upload the part and add etag to part list // Upload the part async
HttpQuery *query = httpQueryNew(); HttpQuery *query = httpQueryNew();
httpQueryAdd(query, S3_QUERY_UPLOAD_ID_STR, this->uploadId); httpQueryAdd(query, S3_QUERY_UPLOAD_ID_STR, this->uploadId);
httpQueryAdd(query, S3_QUERY_PART_NUMBER_STR, strNewFmt("%u", strLstSize(this->uploadPartList) + 1)); httpQueryAdd(query, S3_QUERY_PART_NUMBER_STR, strNewFmt("%u", strLstSize(this->uploadPartList) + 1));
strLstAdd( MEM_CONTEXT_BEGIN(this->memContext)
this->uploadPartList, {
httpHeaderGet( this->request = storageS3RequestAsyncP(
storageS3Request( this->storage, HTTP_VERB_PUT_STR, this->interface.name, .query = query, .content = this->partBuffer);
this->storage, HTTP_VERB_PUT_STR, this->interface.name, query, this->partBuffer, true, false).responseHeader, }
HTTP_HEADER_ETAG_STR)); MEM_CONTEXT_END();
ASSERT(strLstGet(this->uploadPartList, strLstSize(this->uploadPartList) - 1) != NULL);
} }
MEM_CONTEXT_TEMP_END(); MEM_CONTEXT_TEMP_END();
@ -145,6 +171,7 @@ storageWriteS3(THIS_VOID, const Buffer *buffer)
ASSERT(this != NULL); ASSERT(this != NULL);
ASSERT(this->partBuffer != NULL); ASSERT(this->partBuffer != NULL);
ASSERT(buffer != NULL);
size_t bytesTotal = 0; size_t bytesTotal = 0;
@ -160,7 +187,7 @@ storageWriteS3(THIS_VOID, const Buffer *buffer)
// If the part buffer is full then write it // If the part buffer is full then write it
if (bufRemains(this->partBuffer) == 0) if (bufRemains(this->partBuffer) == 0)
{ {
storageWriteS3Part(this); storageWriteS3PartAsync(this);
bufUsedZero(this->partBuffer); bufUsedZero(this->partBuffer);
} }
} }
@ -193,7 +220,10 @@ storageWriteS3Close(THIS_VOID)
{ {
// If there is anything left in the part buffer then write it // If there is anything left in the part buffer then write it
if (bufUsed(this->partBuffer) > 0) if (bufUsed(this->partBuffer) > 0)
storageWriteS3Part(this); storageWriteS3PartAsync(this);
// Complete prior async request, if any
storageWriteS3Part(this);
// Generate the xml part list // Generate the xml part list
XmlDocument *partList = xmlDocumentNew(S3_XML_TAG_COMPLETE_MULTIPART_UPLOAD_STR); XmlDocument *partList = xmlDocumentNew(S3_XML_TAG_COMPLETE_MULTIPART_UPLOAD_STR);
@ -206,16 +236,14 @@ storageWriteS3Close(THIS_VOID)
} }
// Finalize the multi-part upload // Finalize the multi-part upload
storageS3Request( storageS3RequestP(
this->storage, HTTP_VERB_POST_STR, this->interface.name, this->storage, HTTP_VERB_POST_STR, this->interface.name,
httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOAD_ID_STR, this->uploadId), xmlDocumentBuf(partList), true, false); .query = httpQueryAdd(httpQueryNew(), S3_QUERY_UPLOAD_ID_STR, this->uploadId),
.content = xmlDocumentBuf(partList));
} }
// Else upload all the data in a single put // Else upload all the data in a single put
else else
{ storageS3RequestP(this->storage, HTTP_VERB_PUT_STR, this->interface.name, .content = this->partBuffer);
storageS3Request(
this->storage, HTTP_VERB_PUT_STR, this->interface.name, NULL, this->partBuffer, true, false);
}
bufFree(this->partBuffer); bufFree(this->partBuffer);
this->partBuffer = NULL; this->partBuffer = NULL;

View File

@ -250,14 +250,16 @@ unit:
# ---------------------------------------------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------------------------------------------
- name: io-http - name: io-http
total: 6 total: 5
coverage: coverage:
common/io/http/cache: full
common/io/http/client: full common/io/http/client: full
common/io/http/common: full common/io/http/common: full
common/io/http/header: full common/io/http/header: full
common/io/http/query: full common/io/http/query: full
common/io/http/request: full
common/io/http/response: full
common/io/http/session: full
# ---------------------------------------------------------------------------------------------------------------------------- # ----------------------------------------------------------------------------------------------------------------------------
- name: compress - name: compress

View File

@ -6,6 +6,9 @@ stanza-create db - fail on missing control file (backup host)
------------------------------------------------------------------------------------------------------------------------------------ ------------------------------------------------------------------------------------------------------------------------------------
P00 INFO: stanza-create command begin [BACKREST-VERSION]: --buffer-size=[BUFFER-SIZE] --compress-level-network=1 --config=[TEST_PATH]/backup/pgbackrest.conf --db-timeout=45 --lock-path=[TEST_PATH]/backup/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/backup/log[] --no-log-timestamp --no-online --pg1-host=db-primary --pg1-host-cmd=[BACKREST-BIN] --pg1-host-config=[TEST_PATH]/db-primary/pgbackrest.conf --pg1-host-user=[USER-1] --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-cipher-pass=<redacted> --repo1-cipher-type=aes-256-cbc --repo1-path=/ --repo1-s3-bucket=pgbackrest-dev --repo1-s3-endpoint=s3.amazonaws.com --repo1-s3-key=<redacted> --repo1-s3-key-secret=<redacted> --repo1-s3-region=us-east-1 --no-repo1-s3-verify-tls --repo1-type=s3 --stanza=db P00 INFO: stanza-create command begin [BACKREST-VERSION]: --buffer-size=[BUFFER-SIZE] --compress-level-network=1 --config=[TEST_PATH]/backup/pgbackrest.conf --db-timeout=45 --lock-path=[TEST_PATH]/backup/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/backup/log[] --no-log-timestamp --no-online --pg1-host=db-primary --pg1-host-cmd=[BACKREST-BIN] --pg1-host-config=[TEST_PATH]/db-primary/pgbackrest.conf --pg1-host-user=[USER-1] --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-cipher-pass=<redacted> --repo1-cipher-type=aes-256-cbc --repo1-path=/ --repo1-s3-bucket=pgbackrest-dev --repo1-s3-endpoint=s3.amazonaws.com --repo1-s3-key=<redacted> --repo1-s3-key-secret=<redacted> --repo1-s3-region=us-east-1 --no-repo1-s3-verify-tls --repo1-type=s3 --stanza=db
P00 ERROR: [055]: raised from remote-0 protocol on 'db-primary': unable to open missing file '[TEST_PATH]/db-primary/db/base/global/pg_control' for read P00 ERROR: [055]: raised from remote-0 protocol on 'db-primary': unable to open missing file '[TEST_PATH]/db-primary/db/base/global/pg_control' for read
P00 DETAIL: socket statistics:[SOCKET-STATISTICS]
P00 DETAIL: tls statistics:[TLS-STATISTICS]
P00 INFO: http statistics:[HTTP-STATISTICS]
P00 INFO: stanza-create command end: aborted with exception [055] P00 INFO: stanza-create command end: aborted with exception [055]
stanza-upgrade db - fail on stanza not initialized since archive.info is missing (backup host) stanza-upgrade db - fail on stanza not initialized since archive.info is missing (backup host)

View File

@ -108,7 +108,7 @@ Test that an expected error is actually thrown and error when it isn't
\ \
if (strcmp(errorMessage(), errorMessageExpected) != 0 || errorType() != &errorTypeExpected) \ if (strcmp(errorMessage(), errorMessageExpected) != 0 || errorType() != &errorTypeExpected) \
THROW_FMT( \ THROW_FMT( \
TestError, "EXPECTED %s: %s\n\n BUT GOT %s: %s\n\nTHROWN AT:\n%s", errorTypeName(&errorTypeExpected), \ TestError, "EXPECTED %s: %s\n\nBUT GOT %s: %s\n\nTHROWN AT:\n%s", errorTypeName(&errorTypeExpected), \
errorMessageExpected, errorName(), errorMessage(), errorStackTrace()); \ errorMessageExpected, errorName(), errorMessage(), errorStackTrace()); \
} \ } \
TRY_END(); \ TRY_END(); \

View File

@ -131,7 +131,7 @@ testRun(void)
HttpClient *client = NULL; HttpClient *client = NULL;
// Reset statistics // Reset statistics
httpClientStatLocal = (HttpClientStat){0}; httpClientStat = (HttpClientStat){0};
TEST_RESULT_STR(httpClientStatStr(), NULL, "no stats yet"); TEST_RESULT_STR(httpClientStatStr(), NULL, "no stats yet");
@ -140,7 +140,7 @@ testRun(void)
"new client"); "new client");
TEST_ERROR_FMT( TEST_ERROR_FMT(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), HostConnectError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), HostConnectError,
"unable to connect to 'localhost:%u': [111] Connection refused", hrnTlsServerPort()); "unable to connect to 'localhost:%u': [111] Connection refused", hrnTlsServerPort());
HARNESS_FORK_BEGIN() HARNESS_FORK_BEGIN()
@ -180,7 +180,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FileReadError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FileReadError,
"unexpected eof while reading line"); "unexpected eof while reading line");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -194,7 +194,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"HTTP response status 'HTTP/1.0 200 OK' should be CR-terminated"); "HTTP response status 'HTTP/1.0 200 OK' should be CR-terminated");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -208,7 +208,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"HTTP response 'HTTP/1.0 200' has invalid length"); "HTTP response 'HTTP/1.0 200' has invalid length");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -222,7 +222,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"HTTP version of response 'HTTP/1.0 200 OK' must be HTTP/1.1"); "HTTP version of response 'HTTP/1.0 200 OK' must be HTTP/1.1");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -236,7 +236,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"response status '200OK' must have a space after the status code"); "response status '200OK' must have a space after the status code");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -250,7 +250,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FileReadError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FileReadError,
"unexpected eof while reading line"); "unexpected eof while reading line");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -264,7 +264,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"header 'header-value' missing colon"); "header 'header-value' missing colon");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -278,7 +278,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"only 'chunked' is supported for 'transfer-encoding' header"); "only 'chunked' is supported for 'transfer-encoding' header");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -292,7 +292,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), FormatError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), FormatError,
"'transfer-encoding' and 'content-length' headers are both set"); "'transfer-encoding' and 'content-length' headers are both set");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -306,7 +306,7 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_ERROR( TEST_ERROR(
httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), ServiceError, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), ServiceError,
"[503] Slow Down"); "[503] Slow Down");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
@ -321,7 +321,7 @@ testRun(void)
hrnTlsServerAccept(); hrnTlsServerAccept();
hrnTlsServerExpectZ("GET /?name=%2Fpath%2FA%20Z.txt&type=test HTTP/1.1\r\nhost:myhost.com\r\n\r\n"); hrnTlsServerExpectZ("GET /?name=%2Fpath%2FA%20Z.txt&type=test HTTP/1.1\r\nhost:myhost.com\r\n\r\n");
hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\nkey1:0\r\n key2 : value2\r\nConnection:ack\r\n\r\n"); hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\nkey1:0\r\n key2 : value2\r\nConnection:ack\r\ncontent-length:0\r\n\r\n");
HttpHeader *headerRequest = httpHeaderNew(NULL); HttpHeader *headerRequest = httpHeaderNew(NULL);
httpHeaderAdd(headerRequest, strNew("host"), strNew("myhost.com")); httpHeaderAdd(headerRequest, strNew("host"), strNew("myhost.com"));
@ -332,14 +332,38 @@ testRun(void)
client->timeout = 5000; client->timeout = 5000;
TEST_RESULT_VOID( HttpRequest *request = NULL;
httpClientRequest(client, strNew("GET"), strNew("/"), query, headerRequest, NULL, false), "request"); HttpResponse *response = NULL;
TEST_RESULT_UINT(httpClientResponseCode(client), 200, "check response code");
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "OK", "check response message"); MEM_CONTEXT_TEMP_BEGIN()
TEST_RESULT_UINT(httpClientEof(client), true, "io is eof"); {
TEST_ASSIGN(
request, httpRequestNewP(client, strNew("GET"), strNew("/"), .query = query, .header = headerRequest),
"request");
TEST_ASSIGN(response, httpRequest(request, false), "request");
TEST_RESULT_VOID(httpRequestMove(request, memContextPrior()), "move request");
TEST_RESULT_VOID(httpResponseMove(response, memContextPrior()), "move response");
}
MEM_CONTEXT_TEMP_END();
TEST_RESULT_STR_Z(httpRequestVerb(request), "GET", "check request verb");
TEST_RESULT_STR_Z(httpRequestUri(request), "/", "check request uri");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{connection: 'ack', key1: '0', key2: 'value2'}", httpQueryRender(httpRequestQuery(request)), "name=%2Fpath%2FA%20Z.txt&type=test", "check request query");
"check response headers"); TEST_RESULT_PTR_NE(httpRequestHeader(request), NULL, "check request headers");
TEST_RESULT_UINT(httpResponseCode(response), 200, "check response code");
TEST_RESULT_BOOL(httpResponseCodeOk(response), true, "check response code ok");
TEST_RESULT_STR_Z(httpResponseReason(response), "OK", "check response message");
TEST_RESULT_UINT(httpResponseEof(response), true, "io is eof");
TEST_RESULT_STR_Z(
httpHeaderToLog(httpResponseHeader(response)),
"{connection: 'ack', content-length: '0', key1: '0', key2: 'value2'}", "check response headers");
TEST_RESULT_UINT(bufSize(httpResponseContent(response)), 0, "content is empty");
TEST_RESULT_VOID(httpResponseFree(response), "free response");
TEST_RESULT_VOID(httpRequestFree(request), "free request");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("head request with content-length but no content"); TEST_TITLE("head request with content-length but no content");
@ -347,14 +371,13 @@ testRun(void)
hrnTlsServerExpectZ("HEAD / HTTP/1.1\r\n\r\n"); hrnTlsServerExpectZ("HEAD / HTTP/1.1\r\n\r\n");
hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\ncontent-length:380\r\n\r\n"); hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\ncontent-length:380\r\n\r\n");
TEST_RESULT_VOID( TEST_ASSIGN(response, httpRequest(httpRequestNewP(client, strNew("HEAD"), strNew("/")), true), "request");
httpClientRequest(client, strNew("HEAD"), strNew("/"), NULL, httpHeaderNew(NULL), NULL, true), "request"); TEST_RESULT_UINT(httpResponseCode(response), 200, "check response code");
TEST_RESULT_UINT(httpClientResponseCode(client), 200, "check response code"); TEST_RESULT_STR_Z(httpResponseReason(response), "OK", "check response message");
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "OK", "check response message"); TEST_RESULT_BOOL(httpResponseEof(response), true, "io is eof");
TEST_RESULT_BOOL(httpClientEof(client), true, "io is eof"); TEST_RESULT_PTR(response->session, NULL, "session is not busy");
TEST_RESULT_BOOL(httpClientBusy(client), false, "client is not busy");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{content-length: '380'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{content-length: '380'}", "check response headers");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("head request with transfer encoding but no content"); TEST_TITLE("head request with transfer encoding but no content");
@ -362,14 +385,13 @@ testRun(void)
hrnTlsServerExpectZ("HEAD / HTTP/1.1\r\n\r\n"); hrnTlsServerExpectZ("HEAD / HTTP/1.1\r\n\r\n");
hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n"); hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n");
TEST_RESULT_VOID( TEST_ASSIGN(response, httpRequest(httpRequestNewP(client, strNew("HEAD"), strNew("/")), true), "request");
httpClientRequest(client, strNew("HEAD"), strNew("/"), NULL, httpHeaderNew(NULL), NULL, true), "request"); TEST_RESULT_UINT(httpResponseCode(response), 200, "check response code");
TEST_RESULT_UINT(httpClientResponseCode(client), 200, "check response code"); TEST_RESULT_STR_Z(httpResponseReason(response), "OK", "check response message");
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "OK", "check response message"); TEST_RESULT_BOOL(httpResponseEof(response), true, "io is eof");
TEST_RESULT_BOOL(httpClientEof(client), true, "io is eof"); TEST_RESULT_PTR(response->session, NULL, "session is not busy");
TEST_RESULT_BOOL(httpClientBusy(client), false, "client is not busy");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{transfer-encoding: 'chunked'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{transfer-encoding: 'chunked'}", "check response headers");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("head request with connection close but no content"); TEST_TITLE("head request with connection close but no content");
@ -379,14 +401,13 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
TEST_RESULT_VOID( TEST_ASSIGN(response, httpRequest(httpRequestNewP(client, strNew("HEAD"), strNew("/")), true), "request");
httpClientRequest(client, strNew("HEAD"), strNew("/"), NULL, httpHeaderNew(NULL), NULL, true), "request"); TEST_RESULT_UINT(httpResponseCode(response), 200, "check response code");
TEST_RESULT_UINT(httpClientResponseCode(client), 200, "check response code"); TEST_RESULT_STR_Z(httpResponseReason(response), "OK", "check response message");
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "OK", "check response message"); TEST_RESULT_BOOL(httpResponseEof(response), true, "io is eof");
TEST_RESULT_BOOL(httpClientEof(client), true, "io is eof"); TEST_RESULT_PTR(response->session, NULL, "session is not busy");
TEST_RESULT_BOOL(httpClientBusy(client), false, "client is not busy");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{connection: 'close'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{connection: 'close'}", "check response headers");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("error with content (with a few slow down errors)"); TEST_TITLE("error with content (with a few slow down errors)");
@ -406,28 +427,59 @@ testRun(void)
hrnTlsServerAccept(); hrnTlsServerAccept();
hrnTlsServerExpectZ("GET / HTTP/1.1\r\n\r\n"); hrnTlsServerExpectZ("GET / HTTP/1.1\r\n\r\n");
hrnTlsServerReplyZ("HTTP/1.1 404 Not Found\r\ncontent-length:0\r\n\r\n"); hrnTlsServerReplyZ("HTTP/1.1 404 Not Found\r\n\r\n");
TEST_RESULT_VOID(httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), "request"); TEST_ASSIGN(request, httpRequestNewP(client, strNew("GET"), strNew("/")), "request");
TEST_RESULT_UINT(httpClientResponseCode(client), 404, "check response code"); TEST_ASSIGN(response, httpRequest(request, false), "response");
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "Not Found", "check response message"); TEST_RESULT_UINT(httpResponseCode(response), 404, "check response code");
TEST_RESULT_BOOL(httpResponseCodeOk(response), false, "check response code error");
TEST_RESULT_STR_Z(httpResponseReason(response), "Not Found", "check response message");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{content-length: '0'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{}", "check response headers");
TEST_ERROR(
httpRequestError(request, response), ProtocolError,
"HTTP request failed with 404 (Not Found):\n"
"*** URI/Query ***:\n"
"/");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("error with content"); TEST_TITLE("error with content");
hrnTlsServerExpectZ("GET / HTTP/1.1\r\n\r\n"); hrnTlsServerExpectZ("GET /?a=b HTTP/1.1\r\nhdr1:1\r\nhdr2:2\r\n\r\n");
hrnTlsServerReplyZ("HTTP/1.1 403 \r\ncontent-length:7\r\n\r\nCONTENT"); hrnTlsServerReplyZ("HTTP/1.1 403 \r\ncontent-length:7\r\n\r\nCONTENT");
Buffer *buffer = NULL; StringList *headerRedact = strLstNew();
strLstAdd(headerRedact, STRDEF("hdr2"));
headerRequest = httpHeaderNew(headerRedact);
httpHeaderAdd(headerRequest, strNew("hdr1"), strNew("1"));
httpHeaderAdd(headerRequest, strNew("hdr2"), strNew("2"));
TEST_ASSIGN(buffer, httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), "request"); TEST_ASSIGN(
TEST_RESULT_UINT(httpClientResponseCode(client), 403, "check response code"); request,
TEST_RESULT_STR_Z(httpClientResponseMessage(client), "", "check empty response message"); httpRequestNewP(
client, strNew("GET"), strNew("/"), .query = httpQueryAdd(httpQueryNew(), STRDEF("a"), STRDEF("b")),
.header = headerRequest),
"request");
TEST_ASSIGN(response, httpRequest(request, false), "response");
TEST_RESULT_UINT(httpResponseCode(response), 403, "check response code");
TEST_RESULT_STR_Z(httpResponseReason(response), "", "check empty response message");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{content-length: '7'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{content-length: '7'}", "check response headers");
TEST_RESULT_STR_Z(strNewBuf(buffer), "CONTENT", "check response"); TEST_RESULT_STR_Z(strNewBuf(httpResponseContent(response)), "CONTENT", "check response content");
TEST_ERROR(
httpRequestError(request, response), ProtocolError,
"HTTP request failed with 403:\n"
"*** URI/Query ***:\n"
"/?a=b\n"
"*** Request Headers ***:\n"
"hdr1: 1\n"
"hdr2: <redacted>\n"
"*** Response Headers ***:\n"
"content-length: 7\n"
"*** Response Content ***:\n"
"CONTENT");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("request with content using content-length"); TEST_TITLE("request with content using content-length");
@ -440,16 +492,17 @@ testRun(void)
ioBufferSizeSet(30); ioBufferSizeSet(30);
TEST_ASSIGN( TEST_ASSIGN(
buffer, response,
httpClientRequest( httpRequest(
client, strNew("GET"), strNew("/path/file 1.txt"), NULL, httpRequestNewP(
httpHeaderAdd(httpHeaderNew(NULL), strNew("content-length"), strNew("30")), client, strNew("GET"), strNew("/path/file 1.txt"),
BUFSTRDEF("012345678901234567890123456789"), true), .header = httpHeaderAdd(httpHeaderNew(NULL), strNew("content-length"), strNew("30")),
.content = BUFSTRDEF("012345678901234567890123456789")), true),
"request"); "request");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{connection: 'close'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{connection: 'close'}", "check response headers");
TEST_RESULT_STR_Z(strNewBuf(buffer), "01234567890123456789012345678901", "check response"); TEST_RESULT_STR_Z(strNewBuf(httpResponseContent(response)), "01234567890123456789012345678901", "check response");
TEST_RESULT_UINT(httpClientRead(client, bufNew(1), true), 0, "call internal read to check eof"); TEST_RESULT_UINT(httpResponseRead(response, bufNew(1), true), 0, "call internal read to check eof");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("request with eof before content complete with retry"); TEST_TITLE("request with eof before content complete with retry");
@ -466,10 +519,10 @@ testRun(void)
hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\ncontent-length:32\r\n\r\n01234567890123456789012345678901"); hrnTlsServerReplyZ("HTTP/1.1 200 OK\r\ncontent-length:32\r\n\r\n01234567890123456789012345678901");
TEST_ASSIGN( TEST_ASSIGN(
buffer, httpClientRequest(client, strNew("GET"), strNew("/path/file 1.txt"), NULL, NULL, NULL, true), response, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/path/file 1.txt")), true),
"request"); "request");
TEST_RESULT_STR_Z(strNewBuf(buffer), "01234567890123456789012345678901", "check response"); TEST_RESULT_STR_Z(strNewBuf(httpResponseContent(response)), "01234567890123456789012345678901", "check response");
TEST_RESULT_UINT(httpClientRead(client, bufNew(1), true), 0, "call internal read to check eof"); TEST_RESULT_UINT(httpResponseRead(response, bufNew(1), true), 0, "call internal read to check eof");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("request with eof before content complete"); TEST_TITLE("request with eof before content complete");
@ -479,12 +532,12 @@ testRun(void)
hrnTlsServerClose(); hrnTlsServerClose();
buffer = bufNew(32); TEST_ASSIGN(
response, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/path/file 1.txt")), false),
TEST_RESULT_VOID( "request");
httpClientRequest(client, strNew("GET"), strNew("/path/file 1.txt"), NULL, NULL, NULL, false), "request"); TEST_RESULT_PTR_NE(response->session, NULL, "session is busy");
TEST_RESULT_BOOL(httpClientBusy(client), true, "client is busy"); TEST_ERROR(ioRead(httpResponseIoRead(response), bufNew(32)), FileReadError, "unexpected EOF reading HTTP content");
TEST_ERROR(ioRead(httpClientIoRead(client), buffer), FileReadError, "unexpected EOF reading HTTP content"); TEST_RESULT_PTR_NE(response->session, NULL, "session is still busy");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("request with chunked content"); TEST_TITLE("request with chunked content");
@ -498,23 +551,19 @@ testRun(void)
"10\r\n0123456789012345\r\n" "10\r\n0123456789012345\r\n"
"0\r\n\r\n"); "0\r\n\r\n");
TEST_RESULT_VOID(httpClientRequest(client, strNew("GET"), strNew("/"), NULL, NULL, NULL, false), "request"); TEST_ASSIGN(response, httpRequest(httpRequestNewP(client, strNew("GET"), strNew("/")), false), "request");
TEST_RESULT_STR_Z( TEST_RESULT_STR_Z(
httpHeaderToLog(httpClientResponseHeader(client)), "{transfer-encoding: 'chunked'}", "check response headers"); httpHeaderToLog(httpResponseHeader(response)), "{transfer-encoding: 'chunked'}", "check response headers");
buffer = bufNew(35); Buffer *buffer = bufNew(35);
TEST_RESULT_VOID(ioRead(httpClientIoRead(client), buffer), "read response"); TEST_RESULT_VOID(ioRead(httpResponseIoRead(response), buffer), "read response");
TEST_RESULT_STR_Z(strNewBuf(buffer), "01234567890123456789012345678901012", "check response"); TEST_RESULT_STR_Z(strNewBuf(buffer), "01234567890123456789012345678901012", "check response");
// ----------------------------------------------------------------------------------------------------------------- // -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("close connection"); TEST_TITLE("close connection and end server process");
hrnTlsServerClose(); hrnTlsServerClose();
TEST_RESULT_VOID(httpClientFree(client), "free client");
// -----------------------------------------------------------------------------------------------------------------
hrnTlsClientEnd(); hrnTlsClientEnd();
} }
HARNESS_FORK_PARENT_END(); HARNESS_FORK_PARENT_END();
@ -527,31 +576,5 @@ testRun(void)
TEST_RESULT_BOOL(httpClientStatStr() != NULL, true, "check"); TEST_RESULT_BOOL(httpClientStatStr() != NULL, true, "check");
} }
// *****************************************************************************************************************************
if (testBegin("HttpClientCache"))
{
HttpClientCache *cache = NULL;
HttpClient *client1 = NULL;
HttpClient *client2 = NULL;
TEST_ASSIGN(
cache, httpClientCacheNew(strNew("localhost"), hrnTlsServerPort(), 5000, true, NULL, NULL), "new HTTP client cache");
TEST_ASSIGN(client1, httpClientCacheGet(cache), "get HTTP client");
TEST_RESULT_PTR(client1, *(HttpClient **)lstGet(cache->clientList, 0), " check HTTP client");
TEST_RESULT_PTR(httpClientCacheGet(cache), *(HttpClient **)lstGet(cache->clientList, 0), " get same HTTP client");
// Make client 1 look like it is busy
client1->ioRead = (IoRead *)1;
TEST_ASSIGN(client2, httpClientCacheGet(cache), "get HTTP client");
TEST_RESULT_PTR(client2, *(HttpClient **)lstGet(cache->clientList, 1), " check HTTP client");
TEST_RESULT_BOOL(client1 != client2, true, "clients are not the same");
// Set back to NULL so bad things don't happen during free
client1->ioRead = NULL;
TEST_RESULT_VOID(httpClientCacheFree(cache), "free HTTP client cache");
}
FUNCTION_HARNESS_RESULT_VOID(); FUNCTION_HARNESS_RESULT_VOID();
} }

View File

@ -202,8 +202,10 @@ testRun(void)
strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region))); strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region)));
strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s", strPtr(endPoint))); strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s", strPtr(endPoint)));
strLstAdd(argList, strNewFmt("--repo1-s3-host=%s", strPtr(host))); strLstAdd(argList, strNewFmt("--repo1-s3-host=%s", strPtr(host)));
#ifdef TEST_CONTAINER_REQUIRED
strLstAddZ(argList, "--repo1-s3-ca-path=" TLS_CERT_FAKE_PATH); strLstAddZ(argList, "--repo1-s3-ca-path=" TLS_CERT_FAKE_PATH);
strLstAddZ(argList, "--repo1-s3-ca-file=" TLS_CERT_TEST_CERT); strLstAddZ(argList, "--repo1-s3-ca-file=" TLS_CERT_TEST_CERT);
#endif
setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true); setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true);
setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true); setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true);
setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true); setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true);
@ -229,8 +231,6 @@ testRun(void)
strLstAdd(argList, strNewFmt("--repo1-s3-bucket=%s", strPtr(bucket))); strLstAdd(argList, strNewFmt("--repo1-s3-bucket=%s", strPtr(bucket)));
strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region))); strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region)));
strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint))); strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint)));
strLstAddZ(argList, "--repo1-s3-ca-path=" TLS_CERT_FAKE_PATH);
strLstAddZ(argList, "--repo1-s3-ca-file=" TLS_CERT_TEST_CERT);
setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true); setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true);
setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true); setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true);
setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true); setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true);
@ -257,8 +257,6 @@ testRun(void)
strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region))); strLstAdd(argList, strNewFmt("--repo1-s3-region=%s", strPtr(region)));
strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint))); strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint)));
strLstAdd(argList, strNewFmt("--repo1-s3-host=%s:7777", strPtr(host))); strLstAdd(argList, strNewFmt("--repo1-s3-host=%s:7777", strPtr(host)));
strLstAddZ(argList, "--repo1-s3-ca-path=" TLS_CERT_FAKE_PATH);
strLstAddZ(argList, "--repo1-s3-ca-file=" TLS_CERT_TEST_CERT);
setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true); setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true);
setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true); setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true);
setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true); setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true);
@ -286,8 +284,6 @@ testRun(void)
strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint))); strLstAdd(argList, strNewFmt("--repo1-s3-endpoint=%s:999", strPtr(endPoint)));
strLstAdd(argList, strNewFmt("--repo1-s3-host=%s:7777", strPtr(host))); strLstAdd(argList, strNewFmt("--repo1-s3-host=%s:7777", strPtr(host)));
strLstAddZ(argList, "--repo1-s3-port=9001"); strLstAddZ(argList, "--repo1-s3-port=9001");
strLstAddZ(argList, "--repo1-s3-ca-path=" TLS_CERT_FAKE_PATH);
strLstAddZ(argList, "--repo1-s3-ca-file=" TLS_CERT_TEST_CERT);
setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true); setenv("PGBACKREST_REPO1_S3_KEY", strPtr(accessKey), true);
setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true); setenv("PGBACKREST_REPO1_S3_KEY_SECRET", strPtr(secretAccessKey), true);
setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true); setenv("PGBACKREST_REPO1_S3_TOKEN", strPtr(securityToken), true);
@ -448,7 +444,7 @@ testRun(void)
TEST_ERROR( TEST_ERROR(
ioReadOpen(storageReadIo(read)), ProtocolError, ioReadOpen(storageReadIo(read)), ProtocolError,
"S3 request failed with 303: \n" "HTTP request failed with 303:\n"
"*** URI/Query ***:\n" "*** URI/Query ***:\n"
"/file.txt\n" "/file.txt\n"
"*** Request Headers ***:\n" "*** Request Headers ***:\n"
@ -627,7 +623,7 @@ testRun(void)
testResponseP(.code = 344); testResponseP(.code = 344);
TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError, TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError,
"S3 request failed with 344: \n" "HTTP request failed with 344:\n"
"*** URI/Query ***:\n" "*** URI/Query ***:\n"
"/?delimiter=%2F&list-type=2\n" "/?delimiter=%2F&list-type=2\n"
"*** Request Headers ***:\n" "*** Request Headers ***:\n"
@ -650,7 +646,7 @@ testRun(void)
"</Error>"); "</Error>");
TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError, TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError,
"S3 request failed with 344: \n" "HTTP request failed with 344:\n"
"*** URI/Query ***:\n" "*** URI/Query ***:\n"
"/?delimiter=%2F&list-type=2\n" "/?delimiter=%2F&list-type=2\n"
"*** Request Headers ***:\n" "*** Request Headers ***:\n"
@ -698,7 +694,7 @@ testRun(void)
"</Error>"); "</Error>");
TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError, TEST_ERROR(storageListP(s3, strNew("/")), ProtocolError,
"S3 request failed with 403: Forbidden\n" "HTTP request failed with 403 (Forbidden):\n"
"*** URI/Query ***:\n" "*** URI/Query ***:\n"
"/?delimiter=%2F&list-type=2\n" "/?delimiter=%2F&list-type=2\n"
"*** Request Headers ***:\n" "*** Request Headers ***:\n"