You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-09-16 09:06:18 +02:00
Add IoSink filter.
Discard all data passed to the filter. Useful for calculating size/checksum on a remote system when no data needs to be returned. Update ioReadDrain() to automatically use the IoSink filter.
This commit is contained in:
@@ -60,6 +60,7 @@ my @stryCFile =
|
||||
'common/io/filter/buffer.c',
|
||||
'common/io/filter/filter.c',
|
||||
'common/io/filter/group.c',
|
||||
'common/io/filter/sink.c',
|
||||
'common/io/filter/size.c',
|
||||
'common/io/handleWrite.c',
|
||||
'common/io/http/cache.c',
|
||||
|
@@ -81,6 +81,7 @@ SRCS = \
|
||||
common/io/filter/buffer.c \
|
||||
common/io/filter/filter.c \
|
||||
common/io/filter/group.c \
|
||||
common/io/filter/sink.c \
|
||||
common/io/filter/size.c \
|
||||
common/io/handleRead.c \
|
||||
common/io/handleWrite.c \
|
||||
@@ -308,6 +309,9 @@ common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.
|
||||
common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/group.c -o common/io/filter/group.o
|
||||
|
||||
common/io/filter/sink.o: common/io/filter/sink.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/sink.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/sink.c -o common/io/filter/sink.o
|
||||
|
||||
common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/size.c -o common/io/filter/size.o
|
||||
|
||||
@@ -332,7 +336,7 @@ common/io/http/header.o: common/io/http/header.c build.auto.h common/assert.h co
|
||||
common/io/http/query.o: common/io/http/query.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/http/common.h common/io/http/query.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/http/query.c -o common/io/http/query.o
|
||||
|
||||
common/io/io.o: common/io/io.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/io.h common/log.h common/logLevel.h common/stackTrace.h common/type/convert.h
|
||||
common/io/io.o: common/io/io.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/sink.h common/io/io.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/io.c -o common/io/io.o
|
||||
|
||||
common/io/read.o: common/io/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
@@ -512,7 +516,7 @@ storage/posix/write.o: storage/posix/write.c build.auto.h common/assert.h common
|
||||
storage/read.o: storage/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h storage/read.h storage/read.intern.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/read.c -o storage/read.o
|
||||
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h command/backup/pageChecksum.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/crypto/cipherBlock.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/filter/size.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h command/backup/pageChecksum.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/crypto/cipherBlock.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/filter/sink.h common/io/filter/size.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/protocol.c -o storage/remote/protocol.o
|
||||
|
||||
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
|
@@ -48,8 +48,6 @@ ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterIn
|
||||
ASSERT(!(interface.in != NULL && interface.inOut != NULL));
|
||||
// If the filter does not produce output then it should produce a result
|
||||
ASSERT(interface.in == NULL || (interface.result != NULL && interface.done == NULL && interface.inputSame == NULL));
|
||||
// Filters that produce output will not always be able to dump all their output and will need to get the same input again
|
||||
ASSERT(interface.inOut == NULL || interface.inputSame != NULL);
|
||||
|
||||
IoFilter *this = memNew(sizeof(IoFilter));
|
||||
this->memContext = memContextCurrent();
|
||||
|
75
src/common/io/filter/sink.c
Normal file
75
src/common/io/filter/sink.c
Normal file
@@ -0,0 +1,75 @@
|
||||
/***********************************************************************************************************************************
|
||||
IO Sink Filter
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/filter/filter.intern.h"
|
||||
#include "common/io/filter/sink.h"
|
||||
#include "common/log.h"
|
||||
#include "common/memContext.h"
|
||||
#include "common/object.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
STRING_EXTERN(SINK_FILTER_TYPE_STR, SINK_FILTER_TYPE);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Object type
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct IoSink
|
||||
{
|
||||
MemContext *memContext; // Mem context of filter
|
||||
} IoSink;
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Macros for function logging
|
||||
***********************************************************************************************************************************/
|
||||
#define FUNCTION_LOG_IO_SINK_TYPE \
|
||||
IoSink *
|
||||
#define FUNCTION_LOG_IO_SINK_FORMAT(value, buffer, bufferSize) \
|
||||
objToLog(value, "IoSink", buffer, bufferSize)
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Discard all input
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
ioSinkProcess(THIS_VOID, const Buffer *input, Buffer *output)
|
||||
{
|
||||
THIS(IoSink);
|
||||
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_SINK, this);
|
||||
FUNCTION_LOG_PARAM(BUFFER, input);
|
||||
FUNCTION_LOG_PARAM(BUFFER, output);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(input != NULL);
|
||||
ASSERT(output != NULL);
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
New object
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *
|
||||
ioSinkNew(void)
|
||||
{
|
||||
FUNCTION_LOG_VOID(logLevelTrace);
|
||||
|
||||
IoFilter *this = NULL;
|
||||
|
||||
MEM_CONTEXT_NEW_BEGIN("IoSink")
|
||||
{
|
||||
IoSink *driver = memNew(sizeof(IoSink));
|
||||
driver->memContext = memContextCurrent();
|
||||
|
||||
this = ioFilterNewP(SINK_FILTER_TYPE_STR, driver, NULL, .inOut = ioSinkProcess);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
23
src/common/io/filter/sink.h
Normal file
23
src/common/io/filter/sink.h
Normal file
@@ -0,0 +1,23 @@
|
||||
/***********************************************************************************************************************************
|
||||
IO Sink Filter
|
||||
|
||||
Consume all bytes sent to the filter without passing any on. This filter is useful when running size/hash filters on a remote when
|
||||
no data should be returned.
|
||||
***********************************************************************************************************************************/
|
||||
#ifndef COMMON_IO_FILTER_SINK_H
|
||||
#define COMMON_IO_FILTER_SINK_H
|
||||
|
||||
#include "common/io/filter/filter.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define SINK_FILTER_TYPE "sink"
|
||||
STRING_DECLARE(SINK_FILTER_TYPE_STR);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *ioSinkNew(void);
|
||||
|
||||
#endif
|
@@ -4,6 +4,7 @@ IO Functions
|
||||
#include "build.auto.h"
|
||||
|
||||
#include "common/debug.h"
|
||||
#include "common/io/filter/sink.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/log.h"
|
||||
|
||||
@@ -87,22 +88,21 @@ ioReadDrain(IoRead *read)
|
||||
|
||||
ASSERT(read != NULL);
|
||||
|
||||
// Add a sink filter so we only need one read
|
||||
ioFilterGroupAdd(ioReadFilterGroup(read), ioSinkNew());
|
||||
|
||||
// Check if the IO can be opened
|
||||
bool result = ioReadOpen(read);
|
||||
|
||||
if (result)
|
||||
{
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Read IO into the buffer
|
||||
Buffer *buffer = bufNew(ioBufferSize());
|
||||
|
||||
do
|
||||
{
|
||||
ioRead(read, buffer);
|
||||
bufUsedZero(buffer);
|
||||
}
|
||||
while (!ioReadEof(read));
|
||||
// A single read that returns zero bytes
|
||||
ioRead(read, bufNew(1));
|
||||
ASSERT(ioReadEof(read));
|
||||
|
||||
// Close the IO
|
||||
ioReadClose(read);
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
@@ -9,6 +9,7 @@ Remote Storage Protocol Handler
|
||||
#include "common/crypto/cipherBlock.h"
|
||||
#include "common/crypto/hash.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/io/filter/sink.h"
|
||||
#include "common/io/filter/size.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/log.h"
|
||||
@@ -77,6 +78,8 @@ storageRemoteFilterGroup(IoFilterGroup *filterGroup, const Variant *filterList)
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNewVar(filterParam));
|
||||
else if (strEq(filterKey, PAGE_CHECKSUM_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, pageChecksumNewVar(filterParam));
|
||||
else if (strEq(filterKey, SINK_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, ioSinkNew());
|
||||
else if (strEq(filterKey, SIZE_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||
else
|
||||
|
@@ -214,6 +214,7 @@ unit:
|
||||
common/io/filter/buffer: full
|
||||
common/io/filter/filter: full
|
||||
common/io/filter/group: full
|
||||
common/io/filter/sink: full
|
||||
common/io/filter/size: full
|
||||
common/io/handleRead: full
|
||||
common/io/handleWrite: full
|
||||
|
@@ -243,6 +243,30 @@ testRun(void)
|
||||
bufUsedSet(serverWrite, 0);
|
||||
ioBufferSizeSet(8192);
|
||||
|
||||
// Check protocol function directly (file exists but all data goes to sink)
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(strNew("test.txt")));
|
||||
varLstAdd(paramList, varNewBool(false));
|
||||
|
||||
// Create filters to test filter logic
|
||||
filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
ioFilterGroupAdd(filterGroup, ioSinkNew());
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true, "protocol open read (sink)");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(strNewBuf(serverWrite)),
|
||||
"{\"out\":true}\n"
|
||||
"BRBLOCK0\n"
|
||||
"{\"out\":{\"buffer\":null,\"hash\":\"bbbcf2c59433f68f22376cd2439d6cd309378df6\",\"sink\":null,\"size\":8}}\n",
|
||||
"check result");
|
||||
|
||||
bufUsedSet(serverWrite, 0);
|
||||
|
||||
// Check for error on a bogus filter
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
paramList = varLstNew();
|
||||
|
Reference in New Issue
Block a user