1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-03 14:52:21 +02:00

Add read range to all storage drivers.

The range feature allows reading out an arbitrary chunk of a file and will be important for efficient small file support.

Now that all drivers are required to support ranges remove the storageFeatureLimitRead feature flag that was implemented only by the Posix driver.
This commit is contained in:
David Steele 2022-01-11 14:42:53 -05:00 committed by GitHub
parent 2fd100bf12
commit a79034ae2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 227 additions and 58 deletions

View File

@ -42,6 +42,17 @@
</release-improvement-list>
<release-development-list>
<release-item>
<commit subject="Add read range to all storage drivers."/>
<release-item-contributor-list>
<release-item-contributor id="david.steele"/>
<release-item-reviewer id="reid.thompson"/>
</release-item-contributor-list>
<p>Improve small file support.</p>
</release-item>
<release-item>
<commit subject="Fix inconsistent group display names in messages."/>
<commit subject="Dynamically allocate index to key index map."/>

View File

@ -5,6 +5,7 @@ HTTP Header
#include "common/debug.h"
#include "common/io/http/header.h"
#include "common/io/http/request.h"
#include "common/type/keyValue.h"
/***********************************************************************************************************************************
@ -158,6 +159,32 @@ httpHeaderPut(HttpHeader *this, const String *key, const String *value)
FUNCTION_TEST_RETURN(this);
}
/**********************************************************************************************************************************/
HttpHeader *
httpHeaderPutRange(HttpHeader *const this, const uint64_t offset, const Variant *const limit)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(HTTP_HEADER, this);
FUNCTION_TEST_PARAM(UINT64, offset);
FUNCTION_TEST_PARAM(VARIANT, limit);
FUNCTION_TEST_END();
ASSERT(this != NULL);
ASSERT(limit == NULL || varType(limit) == varTypeUInt64);
if (offset != 0 || limit != NULL)
{
String *const range = strCatFmt(strNew(), HTTP_HEADER_RANGE_BYTES "=%" PRIu64 "-", offset);
if (limit != NULL)
strCatFmt(range, "%" PRIu64, offset + varUInt64(limit) - 1);
httpHeaderPut(this, HTTP_HEADER_RANGE_STR, range);
}
FUNCTION_TEST_RETURN(this);
}
/**********************************************************************************************************************************/
bool
httpHeaderRedact(const HttpHeader *this, const String *key)

View File

@ -42,6 +42,9 @@ httpHeaderMove(HttpHeader *const this, MemContext *const parentNew)
// Put a header
HttpHeader *httpHeaderPut(HttpHeader *this, const String *header, const String *value);
// Put range header when needed
HttpHeader *httpHeaderPutRange(HttpHeader *this, uint64_t offset, const Variant *limit);
// Should the header be redacted when logging?
bool httpHeaderRedact(const HttpHeader *this, const String *key);

View File

@ -33,6 +33,7 @@ STRING_EXTERN(HTTP_HEADER_ETAG_STR, HTTP_HEADER_
STRING_EXTERN(HTTP_HEADER_DATE_STR, HTTP_HEADER_DATE);
STRING_EXTERN(HTTP_HEADER_HOST_STR, HTTP_HEADER_HOST);
STRING_EXTERN(HTTP_HEADER_LAST_MODIFIED_STR, HTTP_HEADER_LAST_MODIFIED);
STRING_EXTERN(HTTP_HEADER_RANGE_STR, HTTP_HEADER_RANGE);
#define HTTP_HEADER_USER_AGENT "user-agent"
// 5xx errors that should always be retried

View File

@ -58,6 +58,9 @@ HTTP Constants
STRING_DECLARE(HTTP_HEADER_HOST_STR);
#define HTTP_HEADER_LAST_MODIFIED "last-modified"
STRING_DECLARE(HTTP_HEADER_LAST_MODIFIED_STR);
#define HTTP_HEADER_RANGE "range"
STRING_DECLARE(HTTP_HEADER_RANGE_STR);
#define HTTP_HEADER_RANGE_BYTES "bytes"
/***********************************************************************************************************************************
Constructors

View File

@ -50,7 +50,9 @@ storageReadAzureOpen(THIS_VOID)
MEM_CONTEXT_BEGIN(THIS_MEM_CONTEXT())
{
this->httpResponse = storageAzureRequestP(
this->storage, HTTP_VERB_GET_STR, .path = this->interface.name, .allowMissing = true, .contentIo = true);
this->storage, HTTP_VERB_GET_STR, .path = this->interface.name,
.header = httpHeaderPutRange(httpHeaderNew(NULL), this->interface.offset, this->interface.limit),
.allowMissing = true, .contentIo = true);
}
MEM_CONTEXT_END();
@ -106,12 +108,16 @@ storageReadAzureEof(THIS_VOID)
/**********************************************************************************************************************************/
StorageRead *
storageReadAzureNew(StorageAzure *storage, const String *name, bool ignoreMissing)
storageReadAzureNew(
StorageAzure *const storage, const String *const name, const bool ignoreMissing, const uint64_t offset,
const Variant *const limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_AZURE, storage);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(UINT64, offset);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
@ -132,6 +138,8 @@ storageReadAzureNew(StorageAzure *storage, const String *name, bool ignoreMissin
.type = STORAGE_AZURE_TYPE,
.name = strDup(name),
.ignoreMissing = ignoreMissing,
.offset = offset,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)
{

View File

@ -10,6 +10,7 @@ Azure Storage Read
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
StorageRead *storageReadAzureNew(StorageAzure *storage, const String *name, bool ignoreMissing);
StorageRead *storageReadAzureNew(
StorageAzure *storage, const String *name, bool ignoreMissing, uint64_t offset, const Variant *limit);
#endif

View File

@ -142,6 +142,7 @@ storageAzureAuth(
// Generate string to sign
const String *contentLength = httpHeaderGet(httpHeader, HTTP_HEADER_CONTENT_LENGTH_STR);
const String *contentMd5 = httpHeaderGet(httpHeader, HTTP_HEADER_CONTENT_MD5_STR);
const String *const range = httpHeaderGet(httpHeader, HTTP_HEADER_RANGE_STR);
const String *stringToSign = strNewFmt(
"%s\n" // verb
@ -155,12 +156,13 @@ storageAzureAuth(
"\n" // If-Match
"\n" // If-None-Match
"\n" // If-Unmodified-Since
"\n" // range
"%s\n" // range
"%s" // Canonicalized headers
"/%s%s" // Canonicalized account/path
"%s", // Canonicalized query
strZ(verb), strEq(contentLength, ZERO_STR) ? "" : strZ(contentLength), contentMd5 == NULL ? "" : strZ(contentMd5),
strZ(dateTime), strZ(headerCanonical), strZ(this->account), strZ(path), strZ(queryCanonical));
strZ(dateTime), range == NULL ? "" : strZ(range), strZ(headerCanonical), strZ(this->account), strZ(path),
strZ(queryCanonical));
// Generate authorization header
httpHeaderPut(
@ -537,13 +539,14 @@ storageAzureNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageIn
FUNCTION_LOG_PARAM(STORAGE_AZURE, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)param; // No parameters are used
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(file != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadAzureNew(this, file, ignoreMissing));
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadAzureNew(this, file, ignoreMissing, param.offset, param.limit));
}
/**********************************************************************************************************************************/

View File

@ -56,7 +56,9 @@ storageReadGcsOpen(THIS_VOID)
MEM_CONTEXT_BEGIN(THIS_MEM_CONTEXT())
{
this->httpResponse = storageGcsRequestP(
this->storage, HTTP_VERB_GET_STR, .object = this->interface.name, .allowMissing = true, .contentIo = true,
this->storage, HTTP_VERB_GET_STR, .object = this->interface.name,
.header = httpHeaderPutRange(httpHeaderNew(NULL), this->interface.offset, this->interface.limit),
.allowMissing = true, .contentIo = true,
.query = httpQueryAdd(httpQueryNewP(), GCS_QUERY_ALT_STR, GCS_QUERY_MEDIA_STR));
}
MEM_CONTEXT_END();
@ -113,12 +115,16 @@ storageReadGcsEof(THIS_VOID)
/**********************************************************************************************************************************/
StorageRead *
storageReadGcsNew(StorageGcs *storage, const String *name, bool ignoreMissing)
storageReadGcsNew(
StorageGcs *const storage, const String *const name, const bool ignoreMissing, const uint64_t offset,
const Variant *const limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_GCS, storage);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(UINT64, offset);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
@ -139,6 +145,8 @@ storageReadGcsNew(StorageGcs *storage, const String *name, bool ignoreMissing)
.type = STORAGE_GCS_TYPE,
.name = strDup(name),
.ignoreMissing = ignoreMissing,
.offset = offset,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)
{

View File

@ -10,6 +10,6 @@ GCS Storage Read
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
StorageRead *storageReadGcsNew(StorageGcs *storage, const String *name, bool ignoreMissing);
StorageRead *storageReadGcsNew(StorageGcs *storage, const String *name, bool ignoreMissing, uint64_t offset, const Variant *limit);
#endif

View File

@ -761,13 +761,14 @@ storageGcsNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageInte
FUNCTION_LOG_PARAM(STORAGE_GCS, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)param; // No parameters are used
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(file != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadGcsNew(this, file, ignoreMissing));
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadGcsNew(this, file, ignoreMissing, param.offset, param.limit));
}
/**********************************************************************************************************************************/

View File

@ -95,6 +95,14 @@ storageReadPosixOpen(THIS_VOID)
result = true;
}
// Seek to offset
if (this->interface.offset != 0)
{
THROW_ON_SYS_ERROR_FMT(
lseek(this->fd, (off_t)this->interface.offset, SEEK_SET) == -1, FileOpenError, STORAGE_ERROR_READ_SEEK,
this->interface.offset, strZ(this->interface.name));
}
FUNCTION_LOG_RETURN(BOOL, result);
}
@ -203,11 +211,14 @@ storageReadPosixFd(const THIS_VOID)
/**********************************************************************************************************************************/
StorageRead *
storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit)
storageReadPosixNew(
StoragePosix *const storage, const String *const name, const bool ignoreMissing, const uint64_t offset,
const Variant *const limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(UINT64, offset);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
@ -234,6 +245,7 @@ storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissin
.type = STORAGE_POSIX_TYPE,
.name = strDup(name),
.ignoreMissing = ignoreMissing,
.offset = offset,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)

View File

@ -10,6 +10,7 @@ Posix Storage Read
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
StorageRead *storageReadPosixNew(StoragePosix *storage, const String *name, bool ignoreMissing, const Variant *limit);
StorageRead *storageReadPosixNew(
StoragePosix *storage, const String *name, bool ignoreMissing, uint64_t offset, const Variant *limit);
#endif

View File

@ -310,13 +310,14 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageIn
FUNCTION_LOG_PARAM(STORAGE_POSIX, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(file != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing, param.limit));
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadPosixNew(this, file, ignoreMissing, param.offset, param.limit));
}
/**********************************************************************************************************************************/
@ -545,7 +546,7 @@ storagePosixRemove(THIS_VOID, const String *file, StorageInterfaceRemoveParam pa
/**********************************************************************************************************************************/
static const StorageInterface storageInterfacePosix =
{
.feature = 1 << storageFeaturePath | 1 << storageFeatureCompress | 1 << storageFeatureLimitRead,
.feature = 1 << storageFeaturePath | 1 << storageFeatureCompress,
.info = storagePosixInfo,
.infoList = storagePosixInfoList,

View File

@ -61,6 +61,13 @@ storageReadName(const StorageRead *const this)
return THIS_PUB(StorageRead)->interface->name;
}
// Is there a read limit? NULL for no limit.
__attribute__((always_inline)) static inline uint64_t
storageReadOffset(const StorageRead *const this)
{
return THIS_PUB(StorageRead)->interface->offset;
}
// Get file type
__attribute__((always_inline)) static inline StringId
storageReadType(const StorageRead *const this)

View File

@ -16,6 +16,7 @@ typedef struct StorageReadInterface
bool compressible; // Is this file compressible?
unsigned int compressLevel; // Level to use for compression
bool ignoreMissing;
uint64_t offset; // Where to start reading in the file
const Variant *limit; // Limit how many bytes are read (NULL for no limit)
IoReadInterface ioInterface;
} StorageReadInterface;

View File

@ -355,12 +355,13 @@ storageRemoteOpenReadProtocol(PackRead *const param, ProtocolServer *const serve
{
const String *file = pckReadStrP(param);
bool ignoreMissing = pckReadBoolP(param);
const uint64_t offset = pckReadU64P(param);
const Variant *limit = jsonToVar(pckReadStrP(param));
const Pack *const filter = pckReadPackP(param);
// Create the read object
IoRead *fileRead = storageReadIo(
storageInterfaceNewReadP(storageRemoteProtocolLocal.driver, file, ignoreMissing, .limit = limit));
storageInterfaceNewReadP(storageRemoteProtocolLocal.driver, file, ignoreMissing, .offset = offset, .limit = limit));
// Set filter group based on passed filters
storageRemoteFilterGroup(ioReadFilterGroup(fileRead), filter);

View File

@ -74,6 +74,7 @@ storageReadRemoteOpen(THIS_VOID)
pckWriteStrP(param, this->interface.name);
pckWriteBoolP(param, this->interface.ignoreMissing);
pckWriteU64P(param, this->interface.offset);
pckWriteStrP(param, jsonFromVar(this->interface.limit));
pckWritePackP(param, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read))));
@ -207,8 +208,8 @@ storageReadRemoteEof(THIS_VOID)
/**********************************************************************************************************************************/
StorageRead *
storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel, const Variant *limit)
StorageRemote *const storage, ProtocolClient *const client, const String *const name, const bool ignoreMissing,
const bool compressible, const unsigned int compressLevel, const uint64_t offset, const Variant *const limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
@ -217,6 +218,7 @@ storageReadRemoteNew(
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_PARAM(UINT, compressLevel);
FUNCTION_LOG_PARAM(UINT64, offset);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
@ -242,6 +244,7 @@ storageReadRemoteNew(
.compressible = compressible,
.compressLevel = compressLevel,
.ignoreMissing = ignoreMissing,
.offset = offset,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)

View File

@ -13,6 +13,6 @@ Constructors
***********************************************************************************************************************************/
StorageRead *storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel, const Variant *limit);
unsigned int compressLevel, uint64_t offset, const Variant *limit);
#endif

View File

@ -246,6 +246,7 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
@ -256,7 +257,7 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageI
STORAGE_READ,
storageReadRemoteNew(
this, this->client, file, ignoreMissing, this->compressLevel > 0 ? param.compressible : false, this->compressLevel,
param.limit));
param.offset, param.limit));
}
/**********************************************************************************************************************************/

View File

@ -53,7 +53,9 @@ storageReadS3Open(THIS_VOID)
MEM_CONTEXT_BEGIN(THIS_MEM_CONTEXT())
{
this->httpResponse = storageS3RequestP(
this->storage, HTTP_VERB_GET_STR, this->interface.name, .allowMissing = true, .contentIo = true);
this->storage, HTTP_VERB_GET_STR, this->interface.name,
.header = httpHeaderPutRange(httpHeaderNew(NULL), this->interface.offset, this->interface.limit),
.allowMissing = true, .contentIo = true);
}
MEM_CONTEXT_END();
@ -109,16 +111,20 @@ storageReadS3Eof(THIS_VOID)
/**********************************************************************************************************************************/
StorageRead *
storageReadS3New(StorageS3 *storage, const String *name, bool ignoreMissing)
storageReadS3New(
StorageS3 *const storage, const String *const name, const bool ignoreMissing, const uint64_t offset, const Variant *const limit)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_S3, storage);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(UINT64, offset);
FUNCTION_LOG_PARAM(VARIANT, limit);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
ASSERT(name != NULL);
ASSERT(limit == NULL || varUInt64(limit) > 0);
StorageRead *this = NULL;
@ -135,6 +141,8 @@ storageReadS3New(StorageS3 *storage, const String *name, bool ignoreMissing)
.type = STORAGE_S3_TYPE,
.name = strDup(name),
.ignoreMissing = ignoreMissing,
.offset = offset,
.limit = varDup(limit),
.ioInterface = (IoReadInterface)
{

View File

@ -10,6 +10,7 @@ S3 Storage Read
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
StorageRead *storageReadS3New(StorageS3 *storage, const String *name, bool ignoreMissing);
StorageRead *storageReadS3New(
StorageS3 *storage, const String *name, bool ignoreMissing, uint64_t offset, const Variant *limit);
#endif

View File

@ -420,6 +420,7 @@ storageS3RequestAsync(StorageS3 *this, const String *verb, const String *path, S
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, path);
FUNCTION_LOG_PARAM(HTTP_HEADER, param.header);
FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_END();
@ -432,7 +433,8 @@ storageS3RequestAsync(StorageS3 *this, const String *verb, const String *path, S
MEM_CONTEXT_TEMP_BEGIN()
{
HttpHeader *requestHeader = httpHeaderNew(this->headerRedactList);
HttpHeader *requestHeader = param.header == NULL ?
httpHeaderNew(this->headerRedactList) : httpHeaderDup(param.header, this->headerRedactList);
// Set content length
httpHeaderAdd(
@ -545,6 +547,7 @@ storageS3Request(StorageS3 *this, const String *verb, const String *path, Storag
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, verb);
FUNCTION_LOG_PARAM(STRING, path);
FUNCTION_LOG_PARAM(HTTP_HEADER, param.header);
FUNCTION_LOG_PARAM(HTTP_QUERY, param.query);
FUNCTION_LOG_PARAM(BUFFER, param.content);
FUNCTION_LOG_PARAM(BOOL, param.allowMissing);
@ -554,7 +557,7 @@ storageS3Request(StorageS3 *this, const String *verb, const String *path, Storag
FUNCTION_LOG_RETURN(
HTTP_RESPONSE,
storageS3ResponseP(
storageS3RequestAsyncP(this, verb, path, .query = param.query, .content = param.content),
storageS3RequestAsyncP(this, verb, path, .header = param.header, .query = param.query, .content = param.content),
.allowMissing = param.allowMissing, .contentIo = param.contentIo));
}
@ -640,7 +643,7 @@ storageS3ListInternal(
}
// Else get the response immediately from a sync request
else
response = storageS3RequestP(this, HTTP_VERB_GET_STR, FSLASH_STR, query);
response = storageS3RequestP(this, HTTP_VERB_GET_STR, FSLASH_STR, .query = query);
XmlNode *xmlRoot = xmlDocumentRoot(xmlDocumentNewBuf(httpResponseContent(response)));
@ -655,7 +658,7 @@ storageS3ListInternal(
// Store request in the outer temp context
MEM_CONTEXT_PRIOR_BEGIN()
{
request = storageS3RequestAsyncP(this, HTTP_VERB_GET_STR, FSLASH_STR, query);
request = storageS3RequestAsyncP(this, HTTP_VERB_GET_STR, FSLASH_STR, .query = query);
}
MEM_CONTEXT_PRIOR_END();
}
@ -801,13 +804,14 @@ storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing, StorageInter
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)param; // No parameters are used
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(file != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadS3New(this, file, ignoreMissing));
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadS3New(this, file, ignoreMissing, param.offset, param.limit));
}
/**********************************************************************************************************************************/

View File

@ -19,6 +19,7 @@ Functions
typedef struct StorageS3RequestAsyncParam
{
VAR_PARAM_HEADER;
const HttpHeader *header; // Headers
const HttpQuery *query; // Query parameters
const Buffer *content; // Request content
} StorageS3RequestAsyncParam;
@ -45,6 +46,7 @@ HttpResponse *storageS3Response(HttpRequest *request, StorageS3ResponseParam par
typedef struct StorageS3RequestParam
{
VAR_PARAM_HEADER;
const HttpHeader *header; // Headers
const HttpQuery *query; // Query parameters
const Buffer *content; // Request content
bool allowMissing; // Allow missing files (caller can check response code)

View File

@ -608,11 +608,11 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
FUNCTION_LOG_PARAM(STRING, fileExp);
FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_PARAM(UINT64, param.offset);
FUNCTION_LOG_PARAM(VARIANT, param.limit);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(storageFeature(this, storageFeatureLimitRead) || param.limit == NULL);
ASSERT(param.limit == NULL || varType(param.limit) == varTypeUInt64);
StorageRead *result = NULL;
@ -622,7 +622,7 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
result = storageReadMove(
storageInterfaceNewReadP(
storageDriver(this), storagePathP(this, fileExp), param.ignoreMissing, .compressible = param.compressible,
.limit = param.limit),
.offset = param.offset, .limit = param.limit),
memContextPrior());
}
MEM_CONTEXT_TEMP_END();

View File

@ -43,9 +43,6 @@ typedef enum
// Does the storage support hardlinks? Hardlinks allow the same file to be linked into multiple paths to save space.
storageFeatureHardLink,
// Can the storage limit the amount of data read from a file?
storageFeatureLimitRead,
// Does the storage support symlinks? Symlinks allow paths/files/links to be accessed from another path.
storageFeatureSymLink,
@ -147,6 +144,9 @@ typedef struct StorageNewReadParam
bool ignoreMissing;
bool compressible;
// Where to start reading in the file
const uint64_t offset;
// Limit bytes to read from the file (must be varTypeUInt64). NULL for no limit.
const Variant *limit;
} StorageNewReadParam;

View File

@ -30,6 +30,7 @@ Error messages
#define STORAGE_ERROR_READ_CLOSE "unable to close file '%s' after read"
#define STORAGE_ERROR_READ_OPEN "unable to open file '%s' for read"
#define STORAGE_ERROR_READ_MISSING "unable to open missing file '%s' for read"
#define STORAGE_ERROR_READ_SEEK "unable to seek to %" PRIu64 " in file '%s'"
#define STORAGE_ERROR_INFO "unable to get info for path/file '%s'"
#define STORAGE_ERROR_INFO_MISSING "unable to get info for missing path/file '%s'"
@ -89,6 +90,9 @@ typedef struct StorageInterfaceNewReadParam
// Is the file compressible? This is used when the file must be moved across a network and temporary compression is helpful.
bool compressible;
// Where to start reading in the file
const uint64_t offset;
// Limit bytes read from the file. NULL for no limit.
const Variant *limit;
} StorageInterfaceNewReadParam;

View File

@ -77,7 +77,18 @@ testRun(void)
TEST_RESULT_STR_Z(httpHeaderGet(header, STRDEF("key2")), "value2a, value2b", "get value");
TEST_RESULT_STR(httpHeaderGet(header, STRDEF(BOGUS_STR)), NULL, "get missing value");
TEST_RESULT_STR_Z(httpHeaderToLog(header), "{key1: 'value1', key2: 'value2a, value2b'}", "log output");
TEST_RESULT_STR(httpHeaderGet(httpHeaderPutRange(header, 0, NULL), HTTP_HEADER_RANGE_STR), NULL, "do not put range header");
TEST_RESULT_STR_Z(
httpHeaderGet(httpHeaderPutRange(header, 1, VARUINT64(21)), HTTP_HEADER_RANGE_STR), "bytes=1-21",
"put range header with offset and limit");
TEST_RESULT_STR_Z(
httpHeaderGet(httpHeaderPutRange(header, 0, VARUINT64(21)), HTTP_HEADER_RANGE_STR), "bytes=0-20",
"put range header with offset and limit");
TEST_RESULT_STR_Z(
httpHeaderGet(httpHeaderPutRange(header, 44, NULL), HTTP_HEADER_RANGE_STR), "bytes=44-",
"put range header with offset");
TEST_RESULT_STR_Z(httpHeaderToLog(header), "{key1: 'value1', key2: 'value2a, value2b', range: 'bytes=44-'}", "log output");
TEST_RESULT_VOID(httpHeaderFree(header), "free header");

View File

@ -32,6 +32,7 @@ typedef struct TestRequestParam
VAR_PARAM_HEADER;
const char *content;
const char *blobType;
const char *range;
} TestRequestParam;
#define testRequestP(write, verb, path, ...) \
@ -86,6 +87,10 @@ testRequest(IoWrite *write, const char *verb, const char *path, TestRequestParam
// Add host
strCatFmt(request, "host:%s\r\n", strZ(hrnServerHost()));
// Add range
if (param.range != NULL)
strCatFmt(request, "range:bytes=%s\r\n", param.range);
// Add blob type
if (param.blobType != NULL)
strCatFmt(request, "x-ms-blob-type:%s\r\n", param.blobType);
@ -484,13 +489,14 @@ testRun(void)
"unable to open missing file '/file.txt' for read");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("get file");
TEST_TITLE("get file with offset and limit");
testRequestP(service, HTTP_VERB_GET, "/file.txt");
testRequestP(service, HTTP_VERB_GET, "/file.txt", .range = "1-21");
testResponseP(service, .content = "this is a sample file");
TEST_RESULT_STR_Z(
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt")))), "this is a sample file", "get file");
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
"this is a sample file", "get file");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("get zero-length file");

View File

@ -72,8 +72,9 @@ typedef struct TestRequestParam
bool noAuth;
const char *object;
const char *query;
const char *range;
const char *contentRange;
const char *content;
const char *range;
} TestRequestParam;
#define testRequestP(write, verb, ...) \
@ -104,12 +105,16 @@ testRequest(IoWrite *write, const char *verb, TestRequestParam param)
strCatFmt(request, "content-length:%zu\r\n", param.content == NULL ? 0 : strlen(param.content));
// Add content-range
if (param.range != NULL)
strCatFmt(request, "content-range:bytes %s\r\n", param.range);
if (param.contentRange != NULL)
strCatFmt(request, "content-range:bytes %s\r\n", param.contentRange);
// Add host
strCatFmt(request, "host:%s\r\n", strZ(hrnServerHost()));
// Add range
if (param.range != NULL)
strCatFmt(request, "range:bytes=%s\r\n", param.range);
// Complete headers
strCatZ(request, "\r\n");
@ -408,13 +413,14 @@ testRun(void)
"unable to open missing file '/file.txt' for read");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("get file");
TEST_TITLE("get file with offset and limit");
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media");
testRequestP(service, HTTP_VERB_GET, .object = "file.txt", .query = "alt=media", .range = "1-21");
testResponseP(service, .content = "this is a sample file");
TEST_RESULT_STR_Z(
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt")))), "this is a sample file", "get file");
strNewBuf(storageGetP(storageNewReadP(storage, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
"this is a sample file", "get file");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("switch to auto auth");
@ -577,13 +583,13 @@ testRun(void)
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "name=file.txt&uploadType=resumable&upload_id=ulid1", .range = "0-15/*",
.query = "name=file.txt&uploadType=resumable&upload_id=ulid1", .contentRange = "0-15/*",
.content = "1234567890123456");
testResponseP(service, .code = 308);
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "fields=md5Hash%2Csize&name=file.txt&uploadType=resumable&upload_id=ulid1", .range = "16-31/32",
.query = "fields=md5Hash%2Csize&name=file.txt&uploadType=resumable&upload_id=ulid1", .contentRange = "16-31/32",
.content = "7890123456789012");
testResponseP(service, .content = "{\"md5Hash\":\"dnF5x6K/8ZZRzpfSlMMM+w==\",\"size\":\"32\"}");
@ -598,18 +604,18 @@ testRun(void)
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "name=file.txt&uploadType=resumable&upload_id=ulid2", .range = "0-15/*",
.query = "name=file.txt&uploadType=resumable&upload_id=ulid2", .contentRange = "0-15/*",
.content = "1234567890123456");
testResponseP(service, .code = 503);
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "name=file.txt&uploadType=resumable&upload_id=ulid2", .range = "0-15/*",
.query = "name=file.txt&uploadType=resumable&upload_id=ulid2", .contentRange = "0-15/*",
.content = "1234567890123456");
testResponseP(service, .code = 308);
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "fields=md5Hash%2Csize&name=file.txt&uploadType=resumable&upload_id=ulid2", .range = "16-19/20",
.query = "fields=md5Hash%2Csize&name=file.txt&uploadType=resumable&upload_id=ulid2", .contentRange = "16-19/20",
.content = "7890");
testResponseP(service, .content = "{\"md5Hash\":\"/YXmLZvrRUKHcexohBiycQ==\",\"size\":\"20\"}");
@ -624,7 +630,7 @@ testRun(void)
testRequestP(
service, HTTP_VERB_PUT, .upload = true, .noAuth = true,
.query = "name=file.txt&uploadType=resumable&upload_id=ulid3", .range = "0-15/*",
.query = "name=file.txt&uploadType=resumable&upload_id=ulid3", .contentRange = "0-15/*",
.content = "1234567890123456");
testResponseP(service, .code = 403);

View File

@ -1032,13 +1032,34 @@ testRun(void)
TEST_RESULT_BOOL(memcmp(bufPtrConst(buffer), "TESTFILE\n", bufSize(buffer)) == 0, true, "check content");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("read limited bytes");
TEST_TITLE("error on invalid read offset bytes");
ioBufferSizeSet(2);
TEST_ERROR(
storageGetP(storageNewReadP(storageTest, STRDEF(TEST_PATH "/test.txt"), .offset = UINT64_MAX)), FileOpenError,
"unable to seek to 18446744073709551615 in file '" TEST_PATH "/test.txt': [22] Invalid argument");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("read limited bytes");
TEST_ASSIGN(buffer, storageGetP(storageNewReadP(storageTest, STRDEF(TEST_PATH "/test.txt"), .limit = VARUINT64(7))), "get");
TEST_RESULT_UINT(bufSize(buffer), 7, "check size");
TEST_RESULT_BOOL(memcmp(bufPtrConst(buffer), "TESTFIL", bufSize(buffer)) == 0, true, "check content");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("read offset bytes");
TEST_ASSIGN(buffer, storageGetP(storageNewReadP(storageTest, STRDEF(TEST_PATH "/test.txt"), .offset = 4)), "get");
TEST_RESULT_UINT(bufSize(buffer), 5, "check size");
TEST_RESULT_BOOL(memcmp(bufPtrConst(buffer), "FILE\n", bufSize(buffer)) == 0, true, "check content");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("read offset/limited bytes");
TEST_ASSIGN(
buffer, storageGetP(storageNewReadP(storageTest, STRDEF(TEST_PATH "/test.txt"), .offset = 4,
.limit = VARUINT64(4))), "get");
TEST_RESULT_UINT(bufSize(buffer), 4, "check size");
TEST_RESULT_BOOL(memcmp(bufPtrConst(buffer), "FILE", bufSize(buffer)) == 0, true, "check content");
}
// *****************************************************************************************************************************
@ -1138,7 +1159,8 @@ testRun(void)
MEM_CONTEXT_TEMP_BEGIN()
{
TEST_ASSIGN(
file, storageReadMove(storageNewReadP(storageTest, fileName, .limit = VARUINT64(44)), memContextPrior()),
file,
storageReadMove(storageNewReadP(storageTest, fileName, .limit = VARUINT64(44)), memContextPrior()),
"new read file");
}
MEM_CONTEXT_TEMP_END();
@ -1146,6 +1168,7 @@ testRun(void)
TEST_RESULT_BOOL(ioReadOpen(storageReadIo(file)), true, "open file");
TEST_RESULT_STR(storageReadName(file), fileName, "check file name");
TEST_RESULT_UINT(storageReadType(file), STORAGE_POSIX_TYPE, "check file type");
TEST_RESULT_UINT(storageReadOffset(file), 0, "check offset");
TEST_RESULT_UINT(varUInt64(storageReadLimit(file)), 44, "check limit");
TEST_RESULT_VOID(ioRead(storageReadIo(file), outBuffer), "load data");

View File

@ -27,6 +27,7 @@ typedef struct TestRequestParam
const char *content;
const char *accessKey;
const char *securityToken;
const char *range;
} TestRequestParam;
#define testRequestP(write, s3, verb, path, ...) \
@ -64,7 +65,12 @@ testRequest(IoWrite *write, Storage *s3, const char *verb, const char *path, Tes
if (param.content != NULL)
strCatZ(request, "content-md5;");
strCatZ(request, "host;x-amz-content-sha256;x-amz-date");
strCatZ(request, "host;");
if (param.range != NULL)
strCatZ(request, "range;");
strCatZ(request, "x-amz-content-sha256;x-amz-date");
if (securityToken != NULL)
strCatZ(request, ";x-amz-security-token");
@ -94,6 +100,10 @@ testRequest(IoWrite *write, Storage *s3, const char *verb, const char *path, Tes
else
strCatFmt(request, "host:%s\r\n", strZ(hrnServerHost()));
// Add range
if (param.range != NULL)
strCatFmt(request, "range:bytes=%s\r\n", param.range);
// Add content checksum and date if s3 service
if (s3 != NULL)
{
@ -417,13 +427,14 @@ testRun(void)
"unable to open missing file '/file.txt' for read");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("get file");
TEST_TITLE("get file with offset and limit");
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt");
testRequestP(service, s3, HTTP_VERB_GET, "/file.txt", .range = "1-21");
testResponseP(service, .content = "this is a sample file");
TEST_RESULT_STR_Z(
strNewBuf(storageGetP(storageNewReadP(s3, STRDEF("file.txt")))), "this is a sample file", "get file");
strNewBuf(storageGetP(storageNewReadP(s3, STRDEF("file.txt"), .offset = 1, .limit = VARUINT64(21)))),
"this is a sample file", "get file");
// -----------------------------------------------------------------------------------------------------------------
TEST_TITLE("get zero-length file");