You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-09-16 09:06:18 +02:00
Pass filters to remote storage as a handler array.
The prior code required coverage in the storage/remote module for all filters that could be used remotely. Now the filter handlers are set at runtime so any filter list can be used with a remote. This is more flexible and makes coverage testing easier. It also resolves a test dependency. Move the command/remote unit test near the end so it will have access to all filters without using depends.
This commit is contained in:
@@ -90,6 +90,7 @@
|
|||||||
<commit subject="Add noTruncate flag to storageNewWriteP().">
|
<commit subject="Add noTruncate flag to storageNewWriteP().">
|
||||||
<github-pull-request id="1898"/>
|
<github-pull-request id="1898"/>
|
||||||
</commit>
|
</commit>
|
||||||
|
<commit subject="Pass filters to remote storage as a handler array."/>
|
||||||
|
|
||||||
<release-item-contributor-list>
|
<release-item-contributor-list>
|
||||||
<release-item-contributor id="david.steele"/>
|
<release-item-contributor id="david.steele"/>
|
||||||
|
@@ -5,8 +5,13 @@ Remote Command
|
|||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
#include "command/backup/pageChecksum.h"
|
||||||
#include "command/control/common.h"
|
#include "command/control/common.h"
|
||||||
|
#include "common/crypto/cipherBlock.h"
|
||||||
|
#include "common/crypto/hash.h"
|
||||||
#include "common/debug.h"
|
#include "common/debug.h"
|
||||||
|
#include "common/io/filter/sink.h"
|
||||||
|
#include "common/io/filter/size.h"
|
||||||
#include "common/log.h"
|
#include "common/log.h"
|
||||||
#include "config/config.h"
|
#include "config/config.h"
|
||||||
#include "config/protocol.h"
|
#include "config/protocol.h"
|
||||||
@@ -25,12 +30,27 @@ static const ProtocolServerHandler commandRemoteHandlerList[] =
|
|||||||
PROTOCOL_SERVER_HANDLER_STORAGE_REMOTE_LIST
|
PROTOCOL_SERVER_HANDLER_STORAGE_REMOTE_LIST
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Filter handlers
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
static const StorageRemoteFilterHandler storageRemoteFilterHandlerList[] =
|
||||||
|
{
|
||||||
|
{.type = CIPHER_BLOCK_FILTER_TYPE, .handlerParam = cipherBlockNewPack},
|
||||||
|
{.type = CRYPTO_HASH_FILTER_TYPE, .handlerParam = cryptoHashNewPack},
|
||||||
|
{.type = PAGE_CHECKSUM_FILTER_TYPE, .handlerParam = pageChecksumNewPack},
|
||||||
|
{.type = SINK_FILTER_TYPE, .handlerNoParam = ioSinkNew},
|
||||||
|
{.type = SIZE_FILTER_TYPE, .handlerNoParam = ioSizeNew},
|
||||||
|
};
|
||||||
|
|
||||||
/**********************************************************************************************************************************/
|
/**********************************************************************************************************************************/
|
||||||
void
|
void
|
||||||
cmdRemote(ProtocolServer *const server)
|
cmdRemote(ProtocolServer *const server)
|
||||||
{
|
{
|
||||||
FUNCTION_LOG_VOID(logLevelDebug);
|
FUNCTION_LOG_VOID(logLevelDebug);
|
||||||
|
|
||||||
|
// Set filter handlers
|
||||||
|
storageRemoteFilterHandlerSet(storageRemoteFilterHandlerList, LENGTH_OF(storageRemoteFilterHandlerList));
|
||||||
|
|
||||||
MEM_CONTEXT_TEMP_BEGIN()
|
MEM_CONTEXT_TEMP_BEGIN()
|
||||||
{
|
{
|
||||||
// Acquire a lock if this command needs one. We'll use the noop that is always sent from the client right after the
|
// Acquire a lock if this command needs one. We'll use the noop that is always sent from the client right after the
|
||||||
|
@@ -3,13 +3,8 @@ Remote Storage Protocol Handler
|
|||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
#include "build.auto.h"
|
#include "build.auto.h"
|
||||||
|
|
||||||
#include "command/backup/pageChecksum.h"
|
|
||||||
#include "common/compress/helper.h"
|
#include "common/compress/helper.h"
|
||||||
#include "common/crypto/cipherBlock.h"
|
|
||||||
#include "common/crypto/hash.h"
|
|
||||||
#include "common/debug.h"
|
#include "common/debug.h"
|
||||||
#include "common/io/filter/sink.h"
|
|
||||||
#include "common/io/filter/size.h"
|
|
||||||
#include "common/io/io.h"
|
#include "common/io/io.h"
|
||||||
#include "common/log.h"
|
#include "common/log.h"
|
||||||
#include "common/regExp.h"
|
#include "common/regExp.h"
|
||||||
@@ -27,8 +22,31 @@ static struct
|
|||||||
{
|
{
|
||||||
MemContext *memContext; // Mem context
|
MemContext *memContext; // Mem context
|
||||||
void *driver; // Storage driver used for requests
|
void *driver; // Storage driver used for requests
|
||||||
|
|
||||||
|
const StorageRemoteFilterHandler *filterHandler; // Filter handler list
|
||||||
|
unsigned int filterHandlerSize; // Filter handler list size
|
||||||
} storageRemoteProtocolLocal;
|
} storageRemoteProtocolLocal;
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Set filter handlers
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
void
|
||||||
|
storageRemoteFilterHandlerSet(const StorageRemoteFilterHandler *filterHandler, unsigned int filterHandlerSize)
|
||||||
|
{
|
||||||
|
FUNCTION_TEST_BEGIN();
|
||||||
|
FUNCTION_TEST_PARAM_P(VOID, filterHandler);
|
||||||
|
FUNCTION_TEST_PARAM(UINT, filterHandlerSize);
|
||||||
|
FUNCTION_TEST_END();
|
||||||
|
|
||||||
|
ASSERT(filterHandler != NULL);
|
||||||
|
ASSERT(filterHandlerSize > 0);
|
||||||
|
|
||||||
|
storageRemoteProtocolLocal.filterHandler = filterHandler;
|
||||||
|
storageRemoteProtocolLocal.filterHandlerSize = filterHandlerSize;
|
||||||
|
|
||||||
|
FUNCTION_TEST_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
/***********************************************************************************************************************************
|
/***********************************************************************************************************************************
|
||||||
Set filter group based on passed filters
|
Set filter group based on passed filters
|
||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
@@ -52,37 +70,51 @@ storageRemoteFilterGroup(IoFilterGroup *const filterGroup, const Pack *const fil
|
|||||||
const StringId filterKey = pckReadStrIdP(filterList);
|
const StringId filterKey = pckReadStrIdP(filterList);
|
||||||
const Pack *const filterParam = pckReadPackP(filterList);
|
const Pack *const filterParam = pckReadPackP(filterList);
|
||||||
|
|
||||||
|
// If a compression filter
|
||||||
IoFilter *filter = compressFilterPack(filterKey, filterParam);
|
IoFilter *filter = compressFilterPack(filterKey, filterParam);
|
||||||
|
|
||||||
if (filter != NULL)
|
if (filter != NULL)
|
||||||
|
{
|
||||||
ioFilterGroupAdd(filterGroup, filter);
|
ioFilterGroupAdd(filterGroup, filter);
|
||||||
|
}
|
||||||
|
// Else a filter handler
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
switch (filterKey)
|
ASSERT(storageRemoteProtocolLocal.filterHandler != NULL);
|
||||||
|
|
||||||
|
// Search for a filter handler
|
||||||
|
unsigned int filterIdx = 0;
|
||||||
|
|
||||||
|
for (; filterIdx < storageRemoteProtocolLocal.filterHandlerSize; filterIdx++)
|
||||||
{
|
{
|
||||||
case CIPHER_BLOCK_FILTER_TYPE:
|
// If a match create the filter
|
||||||
ioFilterGroupAdd(filterGroup, cipherBlockNewPack(filterParam));
|
if (storageRemoteProtocolLocal.filterHandler[filterIdx].type == filterKey)
|
||||||
break;
|
{
|
||||||
|
// Create a filter with parameters
|
||||||
|
if (storageRemoteProtocolLocal.filterHandler[filterIdx].handlerParam != NULL)
|
||||||
|
{
|
||||||
|
ASSERT(filterParam != NULL);
|
||||||
|
|
||||||
case CRYPTO_HASH_FILTER_TYPE:
|
ioFilterGroupAdd(
|
||||||
ioFilterGroupAdd(filterGroup, cryptoHashNewPack(filterParam));
|
filterGroup, storageRemoteProtocolLocal.filterHandler[filterIdx].handlerParam(filterParam));
|
||||||
break;
|
}
|
||||||
|
// Else create a filter without parameters
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ASSERT(storageRemoteProtocolLocal.filterHandler[filterIdx].handlerNoParam != NULL);
|
||||||
|
ASSERT(filterParam == NULL);
|
||||||
|
|
||||||
case PAGE_CHECKSUM_FILTER_TYPE:
|
ioFilterGroupAdd(filterGroup, storageRemoteProtocolLocal.filterHandler[filterIdx].handlerNoParam());
|
||||||
ioFilterGroupAdd(filterGroup, pageChecksumNewPack(filterParam));
|
}
|
||||||
break;
|
|
||||||
|
|
||||||
case SINK_FILTER_TYPE:
|
// Break on filter match
|
||||||
ioFilterGroupAdd(filterGroup, ioSinkNew());
|
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case SIZE_FILTER_TYPE:
|
|
||||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
THROW_FMT(AssertError, "unable to add filter '%s'", strZ(strIdToStr(filterKey)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Error when the filter was not found
|
||||||
|
if (filterIdx == storageRemoteProtocolLocal.filterHandlerSize)
|
||||||
|
THROW_FMT(AssertError, "unable to add filter '%s'", strZ(strIdToStr(filterKey)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -48,4 +48,19 @@ Protocol commands for ProtocolServerHandler arrays passed to protocolServerProce
|
|||||||
{.command = PROTOCOL_COMMAND_STORAGE_PATH_SYNC, .handler = storageRemotePathSyncProtocol}, \
|
{.command = PROTOCOL_COMMAND_STORAGE_PATH_SYNC, .handler = storageRemotePathSyncProtocol}, \
|
||||||
{.command = PROTOCOL_COMMAND_STORAGE_REMOVE, .handler = storageRemoteRemoveProtocol},
|
{.command = PROTOCOL_COMMAND_STORAGE_REMOVE, .handler = storageRemoteRemoveProtocol},
|
||||||
|
|
||||||
|
/***********************************************************************************************************************************
|
||||||
|
Filters that may be passed to a remote
|
||||||
|
***********************************************************************************************************************************/
|
||||||
|
typedef IoFilter *(*StorageRemoteFilterHandlerParam)(const Pack *paramList);
|
||||||
|
typedef IoFilter *(*StorageRemoteFilterHandlerNoParam)(void);
|
||||||
|
|
||||||
|
typedef struct StorageRemoteFilterHandler
|
||||||
|
{
|
||||||
|
StringId type; // Filter type
|
||||||
|
StorageRemoteFilterHandlerParam handlerParam; // Handler for filter with parameters
|
||||||
|
StorageRemoteFilterHandlerNoParam handlerNoParam; // Handler with no parameters
|
||||||
|
} StorageRemoteFilterHandler;
|
||||||
|
|
||||||
|
void storageRemoteFilterHandlerSet(const StorageRemoteFilterHandler *filterHandler, unsigned int filterHandlerSize);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@@ -390,7 +390,6 @@ unit:
|
|||||||
- common/lock
|
- common/lock
|
||||||
|
|
||||||
depend:
|
depend:
|
||||||
- command/backup/pageChecksum
|
|
||||||
- common/lock
|
- common/lock
|
||||||
- config/common
|
- config/common
|
||||||
- config/config
|
- config/config
|
||||||
@@ -788,13 +787,6 @@ unit:
|
|||||||
coverage:
|
coverage:
|
||||||
- command/info/info
|
- command/info/info
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------------------------------------------------------
|
|
||||||
- name: remote
|
|
||||||
total: 1
|
|
||||||
|
|
||||||
coverage:
|
|
||||||
- command/remote/remote
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------------------------------------------------------
|
# ----------------------------------------------------------------------------------------------------------------------------
|
||||||
- name: backup
|
- name: backup
|
||||||
total: 11
|
total: 11
|
||||||
@@ -857,6 +849,13 @@ unit:
|
|||||||
coverage:
|
coverage:
|
||||||
- command/local/local
|
- command/local/local
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------------------------------------------------------------
|
||||||
|
- name: remote
|
||||||
|
total: 1
|
||||||
|
|
||||||
|
coverage:
|
||||||
|
- command/remote/remote
|
||||||
|
|
||||||
# ----------------------------------------------------------------------------------------------------------------------------
|
# ----------------------------------------------------------------------------------------------------------------------------
|
||||||
- name: exit
|
- name: exit
|
||||||
total: 3
|
total: 3
|
||||||
|
@@ -3,8 +3,11 @@ Test Remote Storage
|
|||||||
***********************************************************************************************************************************/
|
***********************************************************************************************************************************/
|
||||||
#include "command/backup/pageChecksum.h"
|
#include "command/backup/pageChecksum.h"
|
||||||
#include "common/crypto/cipherBlock.h"
|
#include "common/crypto/cipherBlock.h"
|
||||||
|
#include "common/crypto/hash.h"
|
||||||
#include "common/io/bufferRead.h"
|
#include "common/io/bufferRead.h"
|
||||||
#include "common/io/bufferWrite.h"
|
#include "common/io/bufferWrite.h"
|
||||||
|
#include "common/io/filter/sink.h"
|
||||||
|
#include "common/io/filter/size.h"
|
||||||
#include "config/protocol.h"
|
#include "config/protocol.h"
|
||||||
#include "postgres/interface.h"
|
#include "postgres/interface.h"
|
||||||
|
|
||||||
@@ -30,6 +33,17 @@ testRun(void)
|
|||||||
|
|
||||||
hrnProtocolRemoteShimInstall(testRemoteHandlerList, LENGTH_OF(testRemoteHandlerList));
|
hrnProtocolRemoteShimInstall(testRemoteHandlerList, LENGTH_OF(testRemoteHandlerList));
|
||||||
|
|
||||||
|
// Set filter handlers
|
||||||
|
static const StorageRemoteFilterHandler storageRemoteFilterHandlerList[] =
|
||||||
|
{
|
||||||
|
{.type = CIPHER_BLOCK_FILTER_TYPE, .handlerParam = cipherBlockNewPack},
|
||||||
|
{.type = CRYPTO_HASH_FILTER_TYPE, .handlerParam = cryptoHashNewPack},
|
||||||
|
{.type = SINK_FILTER_TYPE, .handlerNoParam = ioSinkNew},
|
||||||
|
{.type = SIZE_FILTER_TYPE, .handlerNoParam = ioSizeNew},
|
||||||
|
};
|
||||||
|
|
||||||
|
storageRemoteFilterHandlerSet(storageRemoteFilterHandlerList, LENGTH_OF(storageRemoteFilterHandlerList));
|
||||||
|
|
||||||
// Test storage
|
// Test storage
|
||||||
Storage *storageTest = storagePosixNewP(TEST_PATH_STR, .write = true);
|
Storage *storageTest = storagePosixNewP(TEST_PATH_STR, .write = true);
|
||||||
|
|
||||||
@@ -330,7 +344,6 @@ testRun(void)
|
|||||||
IoFilterGroup *filterGroup = ioReadFilterGroup(storageReadIo(fileRead));
|
IoFilterGroup *filterGroup = ioReadFilterGroup(storageReadIo(fileRead));
|
||||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||||
ioFilterGroupAdd(filterGroup, cryptoHashNew(hashTypeSha1));
|
ioFilterGroupAdd(filterGroup, cryptoHashNew(hashTypeSha1));
|
||||||
ioFilterGroupAdd(filterGroup, pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, 0));
|
|
||||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
||||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTRZ("x"), NULL));
|
||||||
ioFilterGroupAdd(filterGroup, compressFilter(compressTypeGz, 3));
|
ioFilterGroupAdd(filterGroup, compressFilter(compressTypeGz, 3));
|
||||||
@@ -341,8 +354,7 @@ testRun(void)
|
|||||||
TEST_RESULT_STR_Z(
|
TEST_RESULT_STR_Z(
|
||||||
hrnPackToStr(ioFilterGroupResultAll(filterGroup)),
|
hrnPackToStr(ioFilterGroupResultAll(filterGroup)),
|
||||||
"1:strid:size, 2:pack:<1:u64:8>, 3:strid:hash, 4:pack:<1:bin:bbbcf2c59433f68f22376cd2439d6cd309378df6>,"
|
"1:strid:size, 2:pack:<1:u64:8>, 3:strid:hash, 4:pack:<1:bin:bbbcf2c59433f68f22376cd2439d6cd309378df6>,"
|
||||||
" 5:strid:pg-chksum, 6:pack:<2:bool:false, 3:bool:false>, 7:strid:cipher-blk, 9:strid:cipher-blk, 11:strid:gz-cmp,"
|
" 5:strid:cipher-blk, 7:strid:cipher-blk, 9:strid:gz-cmp, 11:strid:gz-dcmp, 13:strid:buffer",
|
||||||
" 13:strid:gz-dcmp, 15:strid:buffer",
|
|
||||||
"filter results");
|
"filter results");
|
||||||
|
|
||||||
// Check protocol function directly (file exists but all data goes to sink)
|
// Check protocol function directly (file exists but all data goes to sink)
|
||||||
|
Reference in New Issue
Block a user