You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-09-16 09:06:18 +02:00
Allow multiple filters to be pushed to the remote and return results.
Previously only a single filter could be pushed to the remote since order was not being maintained. Now the filters are strictly ordered. Results are returned from the remote and set in the local IoFilterGroup so they can be retrieved. Expand remote filter support to include all filters.
This commit is contained in:
@@ -512,7 +512,7 @@ storage/posix/write.o: storage/posix/write.c build.auto.h common/assert.h common
|
||||
storage/read.o: storage/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h storage/read.h storage/read.intern.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/read.c -o storage/read.o
|
||||
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h command/backup/pageChecksum.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/crypto/cipherBlock.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/filter/size.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/protocol.c -o storage/remote/protocol.o
|
||||
|
||||
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
|
@@ -229,9 +229,25 @@ pageChecksumNew(unsigned int segmentNo, unsigned int segmentPageTotal, size_t pa
|
||||
driver->valid = true;
|
||||
driver->align = true;
|
||||
|
||||
this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, NULL, .in = pageChecksumProcess, .result = pageChecksumResult);
|
||||
// Create param list
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewUInt(segmentNo));
|
||||
varLstAdd(paramList, varNewUInt(segmentPageTotal));
|
||||
varLstAdd(paramList, varNewUInt64(pageSize));
|
||||
varLstAdd(paramList, varNewUInt64(lsnLimit));
|
||||
|
||||
this = ioFilterNewP(
|
||||
PAGE_CHECKSUM_FILTER_TYPE_STR, driver, paramList, .in = pageChecksumProcess, .result = pageChecksumResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
||||
|
||||
IoFilter *
|
||||
pageChecksumNewVar(const VariantList *paramList)
|
||||
{
|
||||
return pageChecksumNew(
|
||||
varUIntForce(varLstGet(paramList, 0)), varUIntForce(varLstGet(paramList, 1)), varUIntForce(varLstGet(paramList, 2)),
|
||||
varUInt64(varLstGet(paramList, 3)));
|
||||
}
|
||||
|
@@ -18,5 +18,6 @@ Filter type constant
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *pageChecksumNew(unsigned int segmentNo, unsigned int segmentPageTotal, size_t pageSize, uint64_t lsnLimit);
|
||||
IoFilter *pageChecksumNewVar(const VariantList *paramList);
|
||||
|
||||
#endif
|
||||
|
@@ -19,8 +19,7 @@ Block Cipher
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define CIPHER_BLOCK_FILTER_TYPE "cipherBlock"
|
||||
STRING_STATIC(CIPHER_BLOCK_FILTER_TYPE_STR, CIPHER_BLOCK_FILTER_TYPE);
|
||||
STRING_EXTERN(CIPHER_BLOCK_FILTER_TYPE_STR, CIPHER_BLOCK_FILTER_TYPE);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Header constants and sizes
|
||||
@@ -435,12 +434,29 @@ cipherBlockNew(CipherMode mode, CipherType cipherType, const Buffer *pass, const
|
||||
driver->pass = memNewRaw(driver->passSize);
|
||||
memcpy(driver->pass, bufPtr(pass), driver->passSize);
|
||||
|
||||
// Create param list
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewUInt(mode));
|
||||
varLstAdd(paramList, varNewUInt(cipherType));
|
||||
// ??? Using a string here is not correct since the passphrase is being passed as a buffer so may contain null characters.
|
||||
// However, since strings are used to hold the passphrase in the rest of the code this is currently valid.
|
||||
varLstAdd(paramList, varNewStr(strNewBuf(pass)));
|
||||
varLstAdd(paramList, digestName ? varNewStr(digestName) : NULL);
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(
|
||||
CIPHER_BLOCK_FILTER_TYPE_STR, driver, NULL, .done = cipherBlockDone, .inOut = cipherBlockProcess,
|
||||
CIPHER_BLOCK_FILTER_TYPE_STR, driver, paramList, .done = cipherBlockDone, .inOut = cipherBlockProcess,
|
||||
.inputSame = cipherBlockInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
||||
|
||||
IoFilter *
|
||||
cipherBlockNewVar(const VariantList *paramList)
|
||||
{
|
||||
return cipherBlockNew(
|
||||
(CipherMode)varUIntForce(varLstGet(paramList, 0)), (CipherType)varUIntForce(varLstGet(paramList, 1)),
|
||||
BUFSTR(varStr(varLstGet(paramList, 2))), varLstGet(paramList, 3) == NULL ? NULL : varStr(varLstGet(paramList, 3)));
|
||||
}
|
||||
|
@@ -7,9 +7,16 @@ Block Cipher Header
|
||||
#include "common/io/filter/filter.h"
|
||||
#include "common/crypto/common.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define CIPHER_BLOCK_FILTER_TYPE "cipherBlock"
|
||||
STRING_DECLARE(CIPHER_BLOCK_FILTER_TYPE_STR);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *cipherBlockNew(CipherMode mode, CipherType cipherType, const Buffer *pass, const String *digestName);
|
||||
IoFilter *cipherBlockNewVar(const VariantList *paramList);
|
||||
|
||||
#endif
|
||||
|
@@ -163,14 +163,24 @@ cryptoHashNew(const String *type)
|
||||
// Initialize context
|
||||
cryptoError(!EVP_DigestInit_ex(driver->hashContext, driver->hashType, NULL), "unable to initialize hash context");
|
||||
|
||||
// Create param list
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(type));
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, NULL, .in = cryptoHashProcess, .result = cryptoHashResult);
|
||||
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, paramList, .in = cryptoHashProcess, .result = cryptoHashResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
||||
|
||||
IoFilter *
|
||||
cryptoHashNewVar(const VariantList *paramList)
|
||||
{
|
||||
return cryptoHashNew(varStr(varLstGet(paramList, 0)));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get hash for one C buffer
|
||||
***********************************************************************************************************************************/
|
||||
|
@@ -41,6 +41,7 @@ Hash type sizes
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *cryptoHashNew(const String *type);
|
||||
IoFilter *cryptoHashNewVar(const VariantList *paramList);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Helper functions
|
||||
|
@@ -102,6 +102,31 @@ ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter)
|
||||
FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Insert a filter before an index
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *
|
||||
ioFilterGroupInsert(IoFilterGroup *this, unsigned int listIdx, IoFilter *filter)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER, filter);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(!this->opened && !this->closed);
|
||||
ASSERT(filter != NULL);
|
||||
|
||||
// Move the filter to this object's mem context
|
||||
ioFilterMove(filter, this->memContext);
|
||||
|
||||
// Add the filter
|
||||
IoFilterData filterData = {.filter = filter};
|
||||
lstInsert(this->filterList, listIdx, &filterData);
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get a filter
|
||||
***********************************************************************************************************************************/
|
||||
@@ -118,6 +143,27 @@ ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx)
|
||||
FUNCTION_TEST_RETURN((IoFilterData *)lstGet(this->filterList, filterIdx));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Clear filters
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *
|
||||
ioFilterGroupClear(IoFilterGroup *this)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(!this->opened);
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
ioFilterFree(ioFilterGroupGet(this, filterIdx)->filter);
|
||||
|
||||
lstClear(this->filterList);
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER_GROUP, this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Open filter group
|
||||
|
||||
@@ -401,21 +447,20 @@ ioFilterGroupParamAll(const IoFilterGroup *this)
|
||||
ASSERT(!this->opened);
|
||||
ASSERT(this->filterList != NULL);
|
||||
|
||||
KeyValue *result = kvNew();
|
||||
VariantList *result = varLstNew();
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
|
||||
const VariantList *paramList = ioFilterParamList(filter);
|
||||
IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
|
||||
const VariantList *paramList = ioFilterParamList(filter);
|
||||
|
||||
kvAdd(result, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
|
||||
}
|
||||
KeyValue *filterParam = kvNew();
|
||||
kvAdd(filterParam, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
|
||||
|
||||
varLstAdd(result, varNewKv(filterParam));
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(VARIANT, varNewKv(result));
|
||||
FUNCTION_LOG_RETURN(VARIANT, varNewVarLst(result));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@@ -429,8 +474,7 @@ ioFilterGroupResult(const IoFilterGroup *this, const String *filterType)
|
||||
FUNCTION_LOG_PARAM(STRING, filterType);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->opened && this->closed);
|
||||
ASSERT(this->opened);
|
||||
ASSERT(filterType != NULL);
|
||||
|
||||
const Variant *result = NULL;
|
||||
@@ -460,6 +504,30 @@ ioFilterGroupResultAll(const IoFilterGroup *this)
|
||||
FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Set all filter results
|
||||
***********************************************************************************************************************************/
|
||||
void
|
||||
ioFilterGroupResultAllSet(IoFilterGroup *this, const Variant *filterResult)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
if (filterResult != NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
{
|
||||
this->filterResult = kvDup(varKv(filterResult));
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Return total number of filters
|
||||
***********************************************************************************************************************************/
|
||||
|
@@ -30,6 +30,8 @@ IoFilterGroup *ioFilterGroupNew(void);
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter);
|
||||
IoFilterGroup *ioFilterGroupInsert(IoFilterGroup *this, unsigned int listIdx, IoFilter *filter);
|
||||
IoFilterGroup *ioFilterGroupClear(IoFilterGroup *this);
|
||||
void ioFilterGroupOpen(IoFilterGroup *this);
|
||||
void ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output);
|
||||
void ioFilterGroupClose(IoFilterGroup *this);
|
||||
@@ -42,6 +44,7 @@ bool ioFilterGroupInputSame(const IoFilterGroup *this);
|
||||
Variant *ioFilterGroupParamAll(const IoFilterGroup *this);
|
||||
const Variant *ioFilterGroupResult(const IoFilterGroup *this, const String *filterType);
|
||||
const Variant *ioFilterGroupResultAll(const IoFilterGroup *this);
|
||||
void ioFilterGroupResultAllSet(IoFilterGroup *this, const Variant *filterResult);
|
||||
unsigned int ioFilterGroupSize(const IoFilterGroup *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
|
@@ -3,9 +3,13 @@ Remote Storage Protocol Handler
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include "command/backup/pageChecksum.h"
|
||||
#include "common/compress/gzip/compress.h"
|
||||
#include "common/compress/gzip/decompress.h"
|
||||
#include "common/crypto/cipherBlock.h"
|
||||
#include "common/crypto/hash.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/io/filter/size.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/log.h"
|
||||
#include "common/memContext.h"
|
||||
@@ -56,17 +60,24 @@ storageRemoteFilterGroup(IoFilterGroup *filterGroup, const Variant *filterList)
|
||||
ASSERT(filterGroup != NULL);
|
||||
ASSERT(filterList != NULL);
|
||||
|
||||
const VariantList *filterKeyList = kvKeyList(varKv(filterList));
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < varLstSize(filterKeyList); filterIdx++)
|
||||
for (unsigned int filterIdx = 0; filterIdx < varLstSize(varVarLst(filterList)); filterIdx++)
|
||||
{
|
||||
const String *filterKey = varStr(varLstGet(filterKeyList, filterIdx));
|
||||
const VariantList *filterParam = varVarLst(kvGet(varKv(filterList), varLstGet(filterKeyList, filterIdx)));
|
||||
const KeyValue *filterKv = varKv(varLstGet(varVarLst(filterList), filterIdx));
|
||||
const String *filterKey = varStr(varLstGet(kvKeyList(filterKv), 0));
|
||||
const VariantList *filterParam = varVarLst(kvGet(filterKv, VARSTR(filterKey)));
|
||||
|
||||
if (strEq(filterKey, GZIP_COMPRESS_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNewVar(filterParam));
|
||||
else if (strEq(filterKey, GZIP_DECOMPRESS_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNewVar(filterParam));
|
||||
else if (strEq(filterKey, CIPHER_BLOCK_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNewVar(filterParam));
|
||||
else if (strEq(filterKey, CRYPTO_HASH_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNewVar(filterParam));
|
||||
else if (strEq(filterKey, PAGE_CHECKSUM_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, pageChecksumNewVar(filterParam));
|
||||
else if (strEq(filterKey, SIZE_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||
else
|
||||
THROW_FMT(AssertError, "unable to add filter '%s'", strPtr(filterKey));
|
||||
}
|
||||
@@ -151,9 +162,14 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
|
||||
}
|
||||
while (!ioReadEof(fileRead));
|
||||
|
||||
ioReadClose(fileRead);
|
||||
|
||||
// Write a zero block to show file is complete
|
||||
ioWriteLine(protocolServerIoWrite(server), BUFSTRDEF(PROTOCOL_BLOCK_HEADER "0"));
|
||||
ioWriteFlush(protocolServerIoWrite(server));
|
||||
|
||||
// Push filter results
|
||||
protocolServerResponse(server, ioFilterGroupResultAll(ioReadFilterGroup(fileRead)));
|
||||
}
|
||||
}
|
||||
else if (strEq(command, PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR))
|
||||
@@ -204,14 +220,19 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
|
||||
else if (remaining == 0)
|
||||
{
|
||||
ioWriteClose(fileWrite);
|
||||
|
||||
// Push filter results
|
||||
protocolServerResponse(server, ioFilterGroupResultAll(ioWriteFilterGroup(fileWrite)));
|
||||
}
|
||||
// Write was aborted so free the file
|
||||
else
|
||||
{
|
||||
ioWriteFree(fileWrite);
|
||||
protocolServerResponse(server, NULL);
|
||||
}
|
||||
}
|
||||
while (remaining > 0);
|
||||
|
||||
protocolServerResponse(server, NULL);
|
||||
}
|
||||
else if (strEq(command, PROTOCOL_COMMAND_STORAGE_PATH_CREATE_STR))
|
||||
{
|
||||
|
@@ -63,27 +63,26 @@ storageReadRemoteOpen(THIS_VOID)
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
|
||||
// If the file is compressible add compression filter on the remote
|
||||
if (this->interface.compressible)
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew((int)this->interface.compressLevel, true));
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
ioReadFilterGroup(storageReadIo(this->read)), gzipCompressNew((int)this->interface.compressLevel, true));
|
||||
}
|
||||
|
||||
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR);
|
||||
protocolCommandParamAdd(command, VARSTR(this->interface.name));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read))));
|
||||
|
||||
result = varBool(protocolClientExecute(this->client, command, true));
|
||||
|
||||
// Clear filters since they will be run on the remote side
|
||||
ioFilterGroupClear(ioReadFilterGroup(storageReadIo(this->read)));
|
||||
|
||||
// If the file is compressible add decompression filter locally
|
||||
if (this->interface.compressible)
|
||||
{
|
||||
// Since we can't insert filters yet we'll just error if there are already filters in the list
|
||||
CHECK(ioFilterGroupSize(ioReadFilterGroup(storageReadIo(this->read))) == 0);
|
||||
|
||||
ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), gzipDecompressNew(true));
|
||||
}
|
||||
|
||||
result = varBool(protocolClientExecute(this->client, command, true));
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
@@ -122,7 +121,11 @@ storageReadRemote(THIS_VOID, Buffer *buffer, bool block)
|
||||
this->remaining = (size_t)storageRemoteProtocolBlockSize(ioReadLine(protocolClientIoRead(this->client)));
|
||||
|
||||
if (this->remaining == 0)
|
||||
{
|
||||
ioFilterGroupResultAllSet(
|
||||
ioReadFilterGroup(storageReadIo(this->read)), protocolClientReadOutput(this->client, true));
|
||||
this->eof = true;
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
this->protocolReadBytes += this->remaining;
|
||||
|
@@ -68,11 +68,9 @@ storageWriteRemoteOpen(THIS_VOID)
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
|
||||
// If the file is compressible add decompression filter on the remote
|
||||
if (this->interface.compressible)
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(true));
|
||||
ioFilterGroupInsert(ioWriteFilterGroup(storageWriteIo(this->write)), 0, gzipDecompressNew(true));
|
||||
|
||||
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR);
|
||||
protocolCommandParamAdd(command, VARSTR(this->interface.name));
|
||||
@@ -85,17 +83,20 @@ storageWriteRemoteOpen(THIS_VOID)
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.syncFile));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.syncPath));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.atomic));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(ioWriteFilterGroup(storageWriteIo(this->write))));
|
||||
|
||||
// If the file is compressible add compression filter locally
|
||||
protocolClientExecute(this->client, command, false);
|
||||
|
||||
// Clear filters since they will be run on the remote side
|
||||
ioFilterGroupClear(ioWriteFilterGroup(storageWriteIo(this->write)));
|
||||
|
||||
// If the file is compressible add cecompression filter locally
|
||||
if (this->interface.compressible)
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
ioWriteFilterGroup(storageWriteIo(this->write)), gzipCompressNew((int)this->interface.compressLevel, true));
|
||||
}
|
||||
|
||||
protocolClientExecute(this->client, command, false);
|
||||
|
||||
// Set free callback to ensure remote file is freed
|
||||
memContextCallbackSet(this->memContext, storageWriteRemoteFreeResource, this);
|
||||
}
|
||||
@@ -150,7 +151,7 @@ storageWriteRemoteClose(THIS_VOID)
|
||||
{
|
||||
ioWriteLine(protocolClientIoWrite(this->client), BUFSTRDEF(PROTOCOL_BLOCK_HEADER "0"));
|
||||
ioWriteFlush(protocolClientIoWrite(this->client));
|
||||
protocolClientReadOutput(this->client, false);
|
||||
ioFilterGroupResultAllSet(ioWriteFilterGroup(storageWriteIo(this->write)), protocolClientReadOutput(this->client, true));
|
||||
this->client = NULL;
|
||||
|
||||
memContextCallbackClear(this->memContext);
|
||||
|
@@ -161,8 +161,13 @@ testRun(void)
|
||||
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.xrecoff = 0xF0F0F0F0;
|
||||
|
||||
write = ioBufferWriteNew(bufferOut);
|
||||
|
||||
ioFilterGroupAdd(
|
||||
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
|
||||
ioWriteFilterGroup(write),
|
||||
pageChecksumNewVar(
|
||||
varVarLst(
|
||||
jsonToVar(
|
||||
strNewFmt("[0,%u,%u,%" PRIu64 "]", PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)))));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
ioWriteClose(write);
|
||||
|
@@ -1,7 +1,9 @@
|
||||
/***********************************************************************************************************************************
|
||||
Test Block Cipher
|
||||
***********************************************************************************************************************************/
|
||||
#include "common/io/filter/filter.intern.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/type/json.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Data for testing
|
||||
@@ -104,6 +106,7 @@ testRun(void)
|
||||
Buffer *encryptBuffer = bufNew(TEST_BUFFER_SIZE);
|
||||
|
||||
IoFilter *blockEncryptFilter = cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, testPass, NULL);
|
||||
blockEncryptFilter = cipherBlockNewVar(ioFilterParamList(blockEncryptFilter));
|
||||
CipherBlock *blockEncrypt = (CipherBlock *)ioFilterDriver(blockEncryptFilter);
|
||||
|
||||
TEST_RESULT_INT(
|
||||
@@ -153,7 +156,8 @@ testRun(void)
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
Buffer *decryptBuffer = bufNew(TEST_BUFFER_SIZE);
|
||||
|
||||
IoFilter *blockDecryptFilter = cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, testPass, NULL);
|
||||
IoFilter *blockDecryptFilter = cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, testPass, HASH_TYPE_SHA1_STR);
|
||||
blockDecryptFilter = cipherBlockNewVar(ioFilterParamList(blockDecryptFilter));
|
||||
CipherBlock *blockDecrypt = (CipherBlock *)ioFilterDriver(blockDecryptFilter);
|
||||
|
||||
TEST_RESULT_INT(
|
||||
@@ -295,7 +299,7 @@ testRun(void)
|
||||
TEST_RESULT_VOID(ioFilterFree(hash), " free hash");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_ASSIGN(hash, cryptoHashNew(strNew(HASH_TYPE_SHA1)), "create sha1 hash");
|
||||
TEST_ASSIGN(hash, cryptoHashNewVar(varVarLst(jsonToVar(strNewFmt("[\"%s\"]", HASH_TYPE_SHA1)))), "create sha1 hash");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(bufHex(cryptoHash((CryptoHash *)ioFilterDriver(hash)))), "da39a3ee5e6b4b0d3255bfef95601890afd80709",
|
||||
" check empty hash");
|
||||
|
@@ -292,19 +292,25 @@ testRun(void)
|
||||
bufferOriginal = bufNewC("123", 3);
|
||||
|
||||
TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object");
|
||||
|
||||
TEST_RESULT_VOID(ioFilterGroupClear(ioReadFilterGroup(bufferRead)), " clear does nothing when no filters");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioSizeNew()), " add filter to be cleared");
|
||||
TEST_RESULT_VOID(ioFilterGroupClear(ioReadFilterGroup(bufferRead)), " clear size filter");
|
||||
|
||||
IoFilter *sizeFilter = ioSizeNew();
|
||||
TEST_RESULT_PTR(
|
||||
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), sizeFilter), bufferRead->filterGroup, " add filter to filter group");
|
||||
TEST_RESULT_VOID(
|
||||
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 3, 'X')),
|
||||
" add filter to filter group");
|
||||
TEST_RESULT_PTR(
|
||||
ioFilterGroupInsert(ioReadFilterGroup(bufferRead), 0, sizeFilter), bufferRead->filterGroup,
|
||||
" add filter to filter group");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioSizeNew()), " add filter to filter group");
|
||||
IoFilter *bufferFilter = ioBufferNew();
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), bufferFilter), " add filter to filter group");
|
||||
TEST_RESULT_PTR(ioFilterMove(NULL, memContextTop()), NULL, " move NULL filter to top context");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(bufferRead)), 0)),
|
||||
"{\"buffer\":null,\"double\":[\"double\",2,3],\"size\":null}", " check filter params");
|
||||
"[{\"size\":null},{\"double\":[\"double\",2,3]},{\"size\":null},{\"buffer\":null}]", " check filter params");
|
||||
|
||||
TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open");
|
||||
TEST_RESULT_INT(ioReadHandle(bufferRead), -1, " handle invalid");
|
||||
@@ -351,6 +357,15 @@ testRun(void)
|
||||
TEST_RESULT_VOID(ioFilterFree(bufferFilter), " free buffer filter");
|
||||
TEST_RESULT_VOID(ioFilterGroupFree(ioReadFilterGroup(bufferRead)), " free filter group object");
|
||||
|
||||
// Set filter group results
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
filterGroup->opened = true;
|
||||
TEST_RESULT_VOID(ioFilterGroupResultAllSet(filterGroup, NULL), "null result");
|
||||
TEST_RESULT_VOID(ioFilterGroupResultAllSet(filterGroup, jsonToVar(strNew("{\"test\":777}"))), "add result");
|
||||
filterGroup->closed = true;
|
||||
TEST_RESULT_UINT(varUInt64(ioFilterGroupResult(filterGroup, strNew("test"))), 777, " check filter result");
|
||||
|
||||
// Read a zero-size buffer to ensure filters are still processed even when there is no input. Some filters (e.g. encryption
|
||||
// and compression) will produce output even if there is no input.
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
|
@@ -1,9 +1,11 @@
|
||||
/***********************************************************************************************************************************
|
||||
Test Remote Storage
|
||||
***********************************************************************************************************************************/
|
||||
#include "command/backup/pageChecksum.h"
|
||||
#include "common/crypto/cipherBlock.h"
|
||||
#include "common/io/bufferRead.h"
|
||||
#include "common/io/bufferWrite.h"
|
||||
#include "postgres/interface.h"
|
||||
|
||||
#include "common/harnessConfig.h"
|
||||
|
||||
@@ -177,7 +179,7 @@ testRun(void)
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(strNew("missing.txt")));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
varLstAdd(paramList, varNewVarLst(varLstNew()));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true,
|
||||
@@ -197,6 +199,11 @@ testRun(void)
|
||||
|
||||
// Create filters to test filter logic
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
ioFilterGroupAdd(filterGroup, pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0));
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
|
||||
@@ -208,7 +215,10 @@ testRun(void)
|
||||
"{\"out\":true}\n"
|
||||
"BRBLOCK4\n"
|
||||
"TESTBRBLOCK4\n"
|
||||
"DATABRBLOCK0\n",
|
||||
"DATABRBLOCK0\n"
|
||||
"{\"out\":{\"buffer\":null,\"cipherBlock\":null,\"gzipCompress\":null,\"gzipDecompress\":null"
|
||||
",\"hash\":\"bbbcf2c59433f68f22376cd2439d6cd309378df6\",\"pageChecksum\":{\"align\":false,\"valid\":false}"
|
||||
",\"size\":8}}\n",
|
||||
"check result");
|
||||
|
||||
bufUsedSet(serverWrite, 0);
|
||||
@@ -219,15 +229,11 @@ testRun(void)
|
||||
paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(strNew("test.txt")));
|
||||
varLstAdd(paramList, varNewBool(false));
|
||||
|
||||
// Create filters to test filter logic
|
||||
filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("X"), NULL));
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
|
||||
varLstAdd(paramList, varNewVarLst(varLstAdd(varLstNew(), varNewKv(kvAdd(kvNew(), varNewStrZ("bogus"), NULL)))));
|
||||
|
||||
TEST_ERROR(
|
||||
storageRemoteProtocol(
|
||||
PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), AssertError, "unable to add filter 'cipherBlock'");
|
||||
PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), AssertError, "unable to add filter 'bogus'");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
@@ -312,7 +318,7 @@ testRun(void)
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(ioFilterGroupAdd(ioFilterGroupNew(), ioSizeNew())));
|
||||
|
||||
// Generate input (includes the input for the test below -- need a way to reset this for better testing)
|
||||
bufCat(
|
||||
@@ -329,7 +335,7 @@ testRun(void)
|
||||
TEST_RESULT_STR(
|
||||
strPtr(strNewBuf(serverWrite)),
|
||||
"{}\n"
|
||||
"{}\n",
|
||||
"{\"out\":{\"buffer\":null,\"size\":18}}\n",
|
||||
"check result");
|
||||
|
||||
TEST_RESULT_STR(
|
||||
@@ -353,7 +359,7 @@ testRun(void)
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
varLstAdd(paramList, varNewVarLst(varLstNew()));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR, paramList, server), true, "protocol open write");
|
||||
|
Reference in New Issue
Block a user