1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2024-12-14 10:13:05 +02:00

Allow protocol compression when read/writing remote files.

If the file is compressible (i.e. not encrypted or already compressed) it can be marked as such in storageNewRead()/storageNewWrite().  If the file is being read from/written to a remote it will be compressed in transit using gzip.

Simplify filter group handling by having the IoRead/IoWrite objects create the filter group automatically.  This removes the need for a lot of NULL checking and has a negligible effect on performance since a filter group needs to be created eventually unless the source file is missing.

Allow filters to be created using a VariantList so filter parameters can be passed to the remote.
This commit is contained in:
David Steele 2019-06-24 10:20:47 -04:00
parent 62715ebf2d
commit 039e515a31
47 changed files with 529 additions and 296 deletions

View File

@ -216,7 +216,7 @@ command/archive/push/push.o: command/archive/push/push.c build.auto.h command/ar
command/backup/common.o: command/backup/common.c build.auto.h command/backup/common.h common/assert.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/string.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/common.c -o command/backup/common.o
command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h
command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/pageChecksum.c -o command/backup/pageChecksum.o
command/command.o: command/command.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/tls/client.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h version.h
@ -252,19 +252,19 @@ command/storage/list.o: command/storage/list.c build.auto.h common/assert.h comm
common/compress/gzip/common.o: common/compress/gzip/common.c build.auto.h common/assert.h common/compress/gzip/common.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/convert.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/common.c -o common/compress/gzip/common.o
common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/compress.c -o common/compress/gzip/compress.o
common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/decompress.c -o common/compress/gzip/decompress.o
common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/cipherBlock.c -o common/crypto/cipherBlock.o
common/crypto/common.o: common/crypto/common.c build.auto.h common/assert.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/stackTrace.h common/type/convert.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/common.c -o common/crypto/common.o
common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/hash.c -o common/crypto/hash.o
common/debug.o: common/debug.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/stackTrace.h common/type/convert.h
@ -297,16 +297,16 @@ common/io/bufferRead.o: common/io/bufferRead.c build.auto.h common/assert.h comm
common/io/bufferWrite.o: common/io/bufferWrite.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/bufferWrite.h common/io/filter/filter.h common/io/filter/group.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/bufferWrite.c -o common/io/bufferWrite.o
common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/buffer.c -o common/io/filter/buffer.o
common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/filter.c -o common/io/filter/filter.o
common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/variant.h common/type/variantList.h
common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/group.c -o common/io/filter/group.o
common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/size.c -o common/io/filter/size.o
common/io/handleRead.o: common/io/handleRead.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
@ -510,16 +510,16 @@ storage/posix/write.o: storage/posix/write.c build.auto.h common/assert.h common
storage/read.o: storage/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h storage/read.h storage/read.intern.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/read.c -o storage/read.o
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/protocol.c -o storage/remote/protocol.o
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/read.c -o storage/remote/read.o
storage/remote/storage.o: storage/remote/storage.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/storage.c -o storage/remote/storage.o
storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/write.c -o storage/remote/write.o
storage/s3/read.o: storage/s3/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h

View File

@ -132,6 +132,9 @@ archiveGetFile(
// By default result indicates WAL segment not found
int result = 1;
// Is the file compressible during the copy?
bool compressible = true;
// Test for stop file
lockStopTest();
@ -146,26 +149,27 @@ archiveGetFile(
storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable,
.noAtomic = !durable);
// Add filters
IoFilterGroup *filterGroup = ioFilterGroupNew();
// If there is a cipher then add the decrypt filter
if (cipherType != cipherTypeNone)
{
ioFilterGroupAdd(
filterGroup, cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(archiveGetCheckResult.cipherPass), NULL));
ioWriteFilterGroup(storageWriteIo(destination)), cipherBlockNew(cipherModeDecrypt, cipherType,
BUFSTR(archiveGetCheckResult.cipherPass), NULL));
compressible = false;
}
// If file is compressed then add the decompression filter
if (strEndsWithZ(archiveGetCheckResult.archiveFileActual, "." GZIP_EXT))
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup);
{
ioFilterGroupAdd(ioWriteFilterGroup(storageWriteIo(destination)), gzipDecompressNew(false));
compressible = false;
}
// Copy the file
storageCopyNP(
storageNewReadNP(
storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual))),
storageNewReadP(
storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual)),
.compressible = compressible),
destination);
// The WAL file was found

View File

@ -74,8 +74,7 @@ archivePushFile(
{
// Generate a sha1 checksum for the wal segment. ??? Probably need a function in storage for this.
IoRead *read = storageReadIo(storageNewReadNP(storageLocal(), walSource));
IoFilterGroup *filterGroup = ioFilterGroupAdd(ioFilterGroupNew(), cryptoHashNew(HASH_TYPE_SHA1_STR));
ioReadFilterGroupSet(read, filterGroup);
ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR));
Buffer *buffer = bufNew(ioBufferSize());
ioReadOpen(read);
@ -88,7 +87,7 @@ archivePushFile(
while (!ioReadEof(read));
ioReadClose(read);
const String *walSegmentChecksum = varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR));
const String *walSegmentChecksum = varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR));
// If the wal segment already exists in the repo then compare checksums
walSegmentFile = walSegmentFind(storageRepo(), archiveId, archiveFile);
@ -119,27 +118,32 @@ archivePushFile(
{
StorageRead *source = storageNewReadNP(storageLocal(), walSource);
// Add filters
IoFilterGroup *filterGroup = ioFilterGroupNew();
// Is the file compressible during the copy?
bool compressible = true;
// If the file will be compressed then add compression filter
if (isSegment && compress)
{
strCat(archiveDestination, "." GZIP_EXT);
ioFilterGroupAdd(filterGroup, gzipCompressNew(compressLevel, false));
ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(source)), gzipCompressNew(compressLevel, false));
compressible = false;
}
// If there is a cipher then add the encrypt filter
if (cipherType != cipherTypeNone)
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
ioReadFilterGroupSet(storageReadIo(source), filterGroup);
{
ioFilterGroupAdd(
ioReadFilterGroup(storageReadIo(source)),
cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
compressible = false;
}
// Copy the file
storageCopyNP(
source,
storageNewWriteNP(
storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination))));
storageNewWriteP(
storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination)),
.compressible = compressible));
}
}
MEM_CONTEXT_TEMP_END();

View File

@ -229,7 +229,7 @@ pageChecksumNew(unsigned int segmentNo, unsigned int segmentPageTotal, size_t pa
driver->valid = true;
driver->align = true;
this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, .in = pageChecksumProcess, .result = pageChecksumResult);
this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, NULL, .in = pageChecksumProcess, .result = pageChecksumResult);
}
MEM_CONTEXT_NEW_END();

View File

@ -55,7 +55,8 @@ restoreFile(
// Was the file copied?
bool result = true;
// Create destination file. We may not use this but it makes sense to only create it in one place if we do.
// Is the file compressible during the copy?
bool compressible = true;
MEM_CONTEXT_TEMP_BEGIN()
{
@ -81,13 +82,12 @@ restoreFile(
if (info.size == pgFileSize)
{
// Generate checksum for the file if size is not zero
IoFilterGroup *filterGroup = ioFilterGroupNew();
IoRead *read = NULL;
if (info.size != 0)
{
IoRead *read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile));
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
ioReadFilterGroupSet(read, filterGroup);
read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile));
ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR));
Buffer *buffer = bufNew(ioBufferSize());
ioReadOpen(read);
@ -104,7 +104,8 @@ restoreFile(
// If size and checksum are equal then no need to copy the file
if (pgFileSize == 0 ||
strEq(pgFileChecksum, varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR))))
strEq(
pgFileChecksum, varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR))))
{
// Even if hash/size are the same set the time back to backup time. This helps with unit testing, but
// also presents a pristine version of the database after restore.
@ -153,15 +154,21 @@ restoreFile(
// Else perform the copy
else
{
IoFilterGroup *filterGroup = ioFilterGroupNew();
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(pgFileWrite));
// Add decryption filter
if (cipherPass != NULL)
{
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTR(cipherPass), NULL));
compressible = false;
}
// Add decompression filter
if (repoFileCompressed)
{
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
compressible = false;
}
// Add sha1 filter
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
@ -169,15 +176,14 @@ restoreFile(
// Add size filter
ioFilterGroupAdd(filterGroup, ioSizeNew());
ioWriteFilterGroupSet(storageWriteIo(pgFileWrite), filterGroup);
// Copy file
storageCopyNP(
storageNewReadNP(
storageNewReadP(
storageRepo(),
strNewFmt(
STORAGE_REPO_BACKUP "/%s/%s%s", strPtr(repoFileReference), strPtr(repoFile),
repoFileCompressed ? "." GZIP_EXT : "")),
repoFileCompressed ? "." GZIP_EXT : ""),
.compressible = compressible),
pgFileWrite);
// Validate checksum

View File

@ -17,8 +17,7 @@ Gzip Compress
/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress"
STRING_STATIC(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE);
STRING_EXTERN(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE);
/***********************************************************************************************************************************
Object type
@ -185,12 +184,23 @@ gzipCompressNew(int level, bool raw)
// Set free callback to ensure gzip context is freed
memContextCallbackSet(driver->memContext, gzipCompressFreeResource, driver);
// Create param list
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewInt(level));
varLstAdd(paramList, varNewBool(raw));
// Create filter interface
this = ioFilterNewP(
GZIP_COMPRESS_FILTER_TYPE_STR, driver, .done = gzipCompressDone, .inOut = gzipCompressProcess,
GZIP_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipCompressDone, .inOut = gzipCompressProcess,
.inputSame = gzipCompressInputSame);
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(IO_FILTER, this);
}
IoFilter *
gzipCompressNewVar(const VariantList *paramList)
{
return gzipCompressNew(varIntForce(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1)));
}

View File

@ -8,9 +8,16 @@ Compress IO using the gzip format.
#include "common/io/filter/filter.h"
/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress"
STRING_DECLARE(GZIP_COMPRESS_FILTER_TYPE_STR);
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
IoFilter *gzipCompressNew(int level, bool raw);
IoFilter *gzipCompressNewVar(const VariantList *paramList);
#endif

View File

@ -17,8 +17,7 @@ Gzip Decompress
/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress"
STRING_STATIC(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE);
STRING_EXTERN(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE);
/***********************************************************************************************************************************
Object type
@ -162,12 +161,22 @@ gzipDecompressNew(bool raw)
// Set free callback to ensure gzip context is freed
memContextCallbackSet(driver->memContext, gzipDecompressFreeResource, driver);
// Create param list
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewBool(raw));
// Create filter interface
this = ioFilterNewP(
GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, .done = gzipDecompressDone, .inOut = gzipDecompressProcess,
GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipDecompressDone, .inOut = gzipDecompressProcess,
.inputSame = gzipDecompressInputSame);
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(IO_FILTER, this);
}
IoFilter *
gzipDecompressNewVar(const VariantList *paramList)
{
return gzipDecompressNew(varBool(varLstGet(paramList, 0)));
}

View File

@ -8,9 +8,16 @@ Decompress IO from the gzip format.
#include "common/io/filter/filter.h"
/***********************************************************************************************************************************
Filter type constant
***********************************************************************************************************************************/
#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress"
STRING_DECLARE(GZIP_DECOMPRESS_FILTER_TYPE_STR);
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
IoFilter *gzipDecompressNew(bool raw);
IoFilter *gzipDecompressNewVar(const VariantList *paramList);
#endif

View File

@ -437,7 +437,7 @@ cipherBlockNew(CipherMode mode, CipherType cipherType, const Buffer *pass, const
// Create filter interface
this = ioFilterNewP(
CIPHER_BLOCK_FILTER_TYPE_STR, driver, .done = cipherBlockDone, .inOut = cipherBlockProcess,
CIPHER_BLOCK_FILTER_TYPE_STR, driver, NULL, .done = cipherBlockDone, .inOut = cipherBlockProcess,
.inputSame = cipherBlockInputSame);
}
MEM_CONTEXT_NEW_END();

View File

@ -164,7 +164,7 @@ cryptoHashNew(const String *type)
cryptoError(!EVP_DigestInit_ex(driver->hashContext, driver->hashType, NULL), "unable to initialize hash context");
// Create filter interface
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, .in = cryptoHashProcess, .result = cryptoHashResult);
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, NULL, .in = cryptoHashProcess, .result = cryptoHashResult);
}
MEM_CONTEXT_NEW_END();

View File

@ -121,7 +121,7 @@ ioBufferNew(void)
IoBuffer *driver = memNew(sizeof(IoBuffer));
driver->memContext = memContextCurrent();
this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame);
this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, NULL, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame);
}
MEM_CONTEXT_NEW_END();

View File

@ -17,6 +17,7 @@ struct IoFilter
MemContext *memContext; // Mem context of filter
const String *type; // Filter type
void *driver; // Filter driver
const VariantList *paramList; // Filter parameters
IoFilterInterface interface; // Filter interface
bool flushing; // Has the filter started flushing?
@ -30,11 +31,12 @@ New object
Allocations will be in the memory context of the caller.
***********************************************************************************************************************************/
IoFilter *
ioFilterNew(const String *type, void *driver, IoFilterInterface interface)
ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface interface)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STRING, type);
FUNCTION_LOG_PARAM_P(VOID, driver);
FUNCTION_LOG_PARAM(VARIANT_LIST, paramList);
FUNCTION_LOG_PARAM(IO_FILTER_INTERFACE, interface);
FUNCTION_LOG_END();
@ -53,6 +55,7 @@ ioFilterNew(const String *type, void *driver, IoFilterInterface interface)
this->memContext = memContextCurrent();
this->type = type;
this->driver = driver;
this->paramList = paramList;
this->interface = interface;
FUNCTION_LOG_RETURN(IO_FILTER, this);
@ -202,7 +205,7 @@ ioFilterInterface(const IoFilter *this)
/***********************************************************************************************************************************
Does filter produce output?
All InOut filters produce output.
All In filters produce output.
***********************************************************************************************************************************/
bool
ioFilterOutput(const IoFilter *this)
@ -216,6 +219,21 @@ ioFilterOutput(const IoFilter *this)
FUNCTION_TEST_RETURN(this->interface.inOut != NULL);
}
/***********************************************************************************************************************************
List of filter parameters
***********************************************************************************************************************************/
const VariantList *
ioFilterParamList(const IoFilter *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_FILTER, this);
FUNCTION_TEST_END();
ASSERT(this != NULL);
FUNCTION_TEST_RETURN(this->paramList);
}
/***********************************************************************************************************************************
Get filter result
***********************************************************************************************************************************/

View File

@ -19,6 +19,7 @@ Each filter has a type that allows it to be identified in the filter list.
#define COMMON_IO_FILTER_FILTER_INTERN_H
#include "common/io/filter/filter.h"
#include "common/type/variantList.h"
/***********************************************************************************************************************************
Constructor
@ -48,10 +49,10 @@ typedef struct IoFilterInterface
Variant *(*result)(void *driver);
} IoFilterInterface;
#define ioFilterNewP(type, driver, ...) \
ioFilterNew(type, driver, (IoFilterInterface){__VA_ARGS__})
#define ioFilterNewP(type, driver, paramList, ...) \
ioFilterNew(type, driver, paramList, (IoFilterInterface){__VA_ARGS__})
IoFilter *ioFilterNew(const String *type, void *driver, IoFilterInterface);
IoFilter *ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface);
/***********************************************************************************************************************************
Functions
@ -68,6 +69,7 @@ void *ioFilterDriver(IoFilter *this);
bool ioFilterInputSame(const IoFilter *this);
const IoFilterInterface *ioFilterInterface(const IoFilter *this);
bool ioFilterOutput(const IoFilter *this);
const VariantList *ioFilterParamList(const IoFilter *this);
/***********************************************************************************************************************************
Macros for function logging

View File

@ -106,7 +106,7 @@ ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter)
Get a filter
***********************************************************************************************************************************/
static IoFilterData *
ioFilterGroupGet(IoFilterGroup *this, unsigned int filterIdx)
ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
@ -136,13 +136,16 @@ ioFilterGroupOpen(IoFilterGroup *this)
{
// If the last filter is not an output filter then add a filter to buffer/copy data. Input filters won't copy to an output
// buffer so we need some way to get the data to the output buffer.
if (lstSize(this->filterList) == 0 || !ioFilterOutput((ioFilterGroupGet(this, lstSize(this->filterList) - 1))->filter))
if (ioFilterGroupSize(this) == 0 ||
!ioFilterOutput((ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->filter))
{
ioFilterGroupAdd(this, ioBufferNew());
}
// Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output.
Buffer **lastOutputBuffer = NULL;
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
@ -163,7 +166,7 @@ ioFilterGroupOpen(IoFilterGroup *this)
// If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter
// will be provided to the process function.
if (ioFilterOutput(filterData->filter) && filterIdx < lstSize(this->filterList) - 1)
if (ioFilterOutput(filterData->filter) && filterIdx < ioFilterGroupSize(this) - 1)
{
filterData->output = bufNew(ioBufferSize());
lastOutputBuffer = &filterData->output;
@ -207,7 +210,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
// Assign input and output buffers
this->input = input;
(ioFilterGroupGet(this, lstSize(this->filterList) - 1))->output = output;
(ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->output = output;
//
do
@ -220,7 +223,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
if (this->inputSame)
{
this->inputSame = false;
filterIdx = lstSize(this->filterList);
filterIdx = ioFilterGroupSize(this);
do
{
@ -242,7 +245,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
// Process forward from the filter that has input to process. This may be a filter that needs the same input or it may be
// new input for the first filter.
for (; filterIdx < lstSize(this->filterList); filterIdx++)
for (; filterIdx < ioFilterGroupSize(this); filterIdx++)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
@ -288,7 +291,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
this->done = true;
this->inputSame = false;
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
@ -321,7 +324,7 @@ ioFilterGroupClose(IoFilterGroup *this)
ASSERT(this != NULL);
ASSERT(this->opened && !this->closed);
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
{
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
const Variant *filterResult = ioFilterResult(filterData->filter);
@ -350,25 +353,6 @@ ioFilterGroupClose(IoFilterGroup *this)
FUNCTION_LOG_RETURN_VOID();
}
/***********************************************************************************************************************************
Move the object to a new context
***********************************************************************************************************************************/
IoFilterGroup *
ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
FUNCTION_TEST_PARAM(MEM_CONTEXT, parentNew);
FUNCTION_TEST_END();
ASSERT(parentNew != NULL);
if (this != NULL)
memContextMove(this->memContext, parentNew);
FUNCTION_TEST_RETURN(this);
}
/***********************************************************************************************************************************
Is the filter group done processing?
***********************************************************************************************************************************/
@ -403,6 +387,37 @@ ioFilterGroupInputSame(const IoFilterGroup *this)
FUNCTION_TEST_RETURN(this->inputSame);
}
/***********************************************************************************************************************************
Get all filters and parameters so they can be passed to a remote
***********************************************************************************************************************************/
Variant *
ioFilterGroupParamAll(const IoFilterGroup *this)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(!this->opened);
ASSERT(this->filterList != NULL);
KeyValue *result = kvNew();
MEM_CONTEXT_TEMP_BEGIN()
{
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
{
IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
const VariantList *paramList = ioFilterParamList(filter);
kvAdd(result, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
}
}
MEM_CONTEXT_TEMP_END();
FUNCTION_LOG_RETURN(VARIANT, varNewKv(result));
}
/***********************************************************************************************************************************
Get filter results
***********************************************************************************************************************************/
@ -429,11 +444,50 @@ ioFilterGroupResult(const IoFilterGroup *this, const String *filterType)
FUNCTION_LOG_RETURN_CONST(VARIANT, result);
}
/***********************************************************************************************************************************
Get all filter results
***********************************************************************************************************************************/
const Variant *
ioFilterGroupResultAll(const IoFilterGroup *this)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(this->closed);
FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult));
}
/***********************************************************************************************************************************
Return total number of filters
***********************************************************************************************************************************/
unsigned int
ioFilterGroupSize(const IoFilterGroup *this)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
FUNCTION_TEST_END();
FUNCTION_TEST_RETURN(lstSize(this->filterList));
}
/***********************************************************************************************************************************
Render as string for logging
***********************************************************************************************************************************/
String *
ioFilterGroupToLog(const IoFilterGroup *this)
{
return strNewFmt("{inputSame: %s, done: %s}", cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done));
return strNewFmt(
"{inputSame: %s, done: %s"
#ifdef DEBUG
", opened %s, flushing %s, closed %s"
#endif
"}",
cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done)
#ifdef DEBUG
, cvtBoolToConstZ(this->opened), cvtBoolToConstZ(this->flushing), cvtBoolToConstZ(this->closed)
#endif
);
}

View File

@ -34,14 +34,15 @@ void ioFilterGroupOpen(IoFilterGroup *this);
void ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output);
void ioFilterGroupClose(IoFilterGroup *this);
IoFilterGroup *ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew);
/***********************************************************************************************************************************
Getters
***********************************************************************************************************************************/
bool ioFilterGroupDone(const IoFilterGroup *this);
bool ioFilterGroupInputSame(const IoFilterGroup *this);
Variant *ioFilterGroupParamAll(const IoFilterGroup *this);
const Variant *ioFilterGroupResult(const IoFilterGroup *this, const String *filterType);
const Variant *ioFilterGroupResultAll(const IoFilterGroup *this);
unsigned int ioFilterGroupSize(const IoFilterGroup *this);
/***********************************************************************************************************************************
Destructor

View File

@ -94,7 +94,7 @@ ioSizeNew(void)
IoSize *driver = memNew(sizeof(IoSize));
driver->memContext = memContextCurrent();
this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, .in = ioSizeProcess, .result = ioSizeResult);
this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, NULL, .in = ioSizeProcess, .result = ioSizeResult);
}
MEM_CONTEXT_NEW_END();

View File

@ -56,6 +56,7 @@ ioReadNew(void *driver, IoReadInterface interface)
this->memContext = memContextCurrent();
this->driver = driver;
this->interface = interface;
this->filterGroup = ioFilterGroupNew();
this->input = bufNew(ioBufferSize());
}
MEM_CONTEXT_NEW_END();
@ -75,25 +76,14 @@ ioReadOpen(IoRead *this)
ASSERT(this != NULL);
ASSERT(!this->opened && !this->closed);
ASSERT(ioFilterGroupSize(this->filterGroup) == 0 || !ioReadBlock(this));
// Open if the driver has an open function
bool result = this->interface.open != NULL ? this->interface.open(this->driver) : true;
// Only open the filter group if the read was opened
if (result)
{
// If no filter group exists create one to do buffering
if (this->filterGroup == NULL)
{
MEM_CONTEXT_BEGIN(this->memContext)
{
this->filterGroup = ioFilterGroupNew();
}
MEM_CONTEXT_END();
}
ioFilterGroupOpen(this->filterGroup);
}
#ifdef DEBUG
this->opened = result;
@ -365,9 +355,7 @@ ioReadEof(const IoRead *this)
}
/***********************************************************************************************************************************
Get/set filters
Filters must be set before open and cannot be reset.
Get filter group if filters need to be added
***********************************************************************************************************************************/
IoFilterGroup *
ioReadFilterGroup(const IoRead *this)
@ -381,25 +369,6 @@ ioReadFilterGroup(const IoRead *this)
FUNCTION_TEST_RETURN(this->filterGroup);
}
IoRead *
ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(IO_READ, this);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(filterGroup != NULL);
ASSERT(this->filterGroup == NULL);
ASSERT(!this->opened && !this->closed);
ASSERT(!ioReadBlock(this));
this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext);
FUNCTION_LOG_RETURN(IO_READ, this);
}
/***********************************************************************************************************************************
Handle (file descriptor) for the read object

View File

@ -34,7 +34,6 @@ Getters/Setters
bool ioReadBlock(const IoRead *this);
bool ioReadEof(const IoRead *this);
IoFilterGroup *ioReadFilterGroup(const IoRead *this);
IoRead *ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup);
int ioReadHandle(const IoRead *this);
/***********************************************************************************************************************************

View File

@ -24,7 +24,7 @@ struct IoWrite
Buffer *output; // Output buffer
#ifdef DEBUG
bool filterGroupSet; // Was an IoFilterGroup set?
bool filterGroupSet; // Were filters set?
bool opened; // Has the io been opened?
bool closed; // Has the io been closed?
#endif
@ -54,6 +54,7 @@ ioWriteNew(void *driver, IoWriteInterface interface)
this->memContext = memContextCurrent();
this->driver = driver;
this->interface = interface;
this->filterGroup = ioFilterGroupNew();
this->output = bufNew(ioBufferSize());
}
MEM_CONTEXT_NEW_END();
@ -77,16 +78,12 @@ ioWriteOpen(IoWrite *this)
if (this->interface.open != NULL)
this->interface.open(this->driver);
// If no filter group exists create one to do buffering
if (this->filterGroup == NULL)
{
MEM_CONTEXT_BEGIN(this->memContext)
{
this->filterGroup = ioFilterGroupNew();
}
MEM_CONTEXT_END();
}
// Track whether filters were added to prevent flush() from being called later since flush() won't work with most filters
#ifdef DEBUG
this->filterGroupSet = ioFilterGroupSize(this->filterGroup) > 0;
#endif
// Open the filter group
ioFilterGroupOpen(this->filterGroup);
#ifdef DEBUG
@ -287,29 +284,6 @@ ioWriteFilterGroup(const IoWrite *this)
FUNCTION_TEST_RETURN(this->filterGroup);
}
IoWrite *
ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(IO_WRITE, this);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup);
FUNCTION_LOG_END();
ASSERT(this != NULL);
ASSERT(filterGroup != NULL);
ASSERT(this->filterGroup == NULL);
ASSERT(!this->opened && !this->closed);
// Track whether a filter group was set to prevent flush() from being called later
#ifdef DEBUG
this->filterGroupSet = true;
#endif
this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext);
FUNCTION_LOG_RETURN(IO_WRITE, this);
}
/***********************************************************************************************************************************
Handle (file descriptor) for the write object

View File

@ -34,7 +34,6 @@ void ioWriteClose(IoWrite *this);
Getters/Setters
***********************************************************************************************************************************/
IoFilterGroup *ioWriteFilterGroup(const IoWrite *this);
IoWrite *ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup);
int ioWriteHandle(const IoWrite *this);
/***********************************************************************************************************************************

View File

@ -142,13 +142,13 @@ infoLoad(Info *this, const Storage *storage, const String *fileName, bool copyFi
const String *fileNameExt = copyFile ? strNewFmt("%s" INFO_COPY_EXT, strPtr(fileName)) : fileName;
// Attempt to load the file
StorageRead *infoRead = storageNewReadNP(storage, fileNameExt);
StorageRead *infoRead = storageNewReadP(storage, fileNameExt, .compressible = cipherType == cipherTypeNone);
if (cipherType != cipherTypeNone)
{
ioReadFilterGroupSet(
storageReadIo(infoRead),
ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL)));
ioFilterGroupAdd(
ioReadFilterGroup(storageReadIo(infoRead)),
cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL));
}
// Load and parse the info file
@ -334,13 +334,12 @@ infoSave(
iniSet(ini, INFO_SECTION_BACKREST_STR, INFO_KEY_CHECKSUM_STR, jsonFromStr(infoHash(ini)));
// Save info file
IoWrite *infoWrite = storageWriteIo(storageNewWriteNP(storage, fileName));
IoWrite *infoWrite = storageWriteIo(storageNewWriteP(storage, fileName, .compressible = cipherType == cipherTypeNone));
if (cipherType != cipherTypeNone)
{
ioWriteFilterGroupSet(
infoWrite,
ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL)));
ioFilterGroupAdd(
ioWriteFilterGroup(infoWrite), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
}
iniSave(ini, infoWrite);

View File

@ -278,7 +278,7 @@ storageRepoGet(const String *type, bool write)
{
result = storageRemoteNew(
STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, storageRepoPathExpression,
protocolRemoteGet(protocolStorageTypeRepo));
protocolRemoteGet(protocolStorageTypeRepo), cfgOptionUInt(cfgOptCompressLevelNetwork));
}
// Use CIFS storage
else if (strEqZ(type, STORAGE_TYPE_CIFS))

View File

@ -394,7 +394,7 @@ storagePosixMove(THIS_VOID, StorageRead *source, StorageWrite *destination)
New file read object
***********************************************************************************************************************************/
static StorageRead *
storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing)
storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
{
THIS(StoragePosix);
@ -402,6 +402,7 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing)
FUNCTION_LOG_PARAM(STORAGE_POSIX, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)compressible;
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -416,7 +417,7 @@ New file write object
static StorageWrite *
storagePosixNewWrite(
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
bool createPath, bool syncFile, bool syncPath, bool atomic)
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
{
THIS(StoragePosix);
@ -432,6 +433,7 @@ storagePosixNewWrite(
FUNCTION_LOG_PARAM(BOOL, syncFile);
FUNCTION_LOG_PARAM(BOOL, syncPath);
FUNCTION_LOG_PARAM(BOOL, atomic);
(void)compressible;
FUNCTION_LOG_END();
ASSERT(this != NULL);

View File

@ -14,6 +14,8 @@ typedef struct StorageReadInterface
{
const String * type;
const String * name;
bool compressible; // Is this file compressible?
unsigned int compressLevel; // Level to use for compression
bool ignoreMissing;
IoReadInterface ioInterface;
} StorageReadInterface;

View File

@ -3,6 +3,8 @@ Remote Storage Protocol Handler
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/compress/gzip/compress.h"
#include "common/compress/gzip/decompress.h"
#include "common/debug.h"
#include "common/io/io.h"
#include "common/log.h"
@ -40,6 +42,38 @@ static struct
RegExp *blockRegExp; // Regular expression to check block messages
} storageRemoteProtocolLocal;
/***********************************************************************************************************************************
Set filter group based on passed filters
***********************************************************************************************************************************/
static void
storageRemoteFilterGroup(IoFilterGroup *filterGroup, const Variant *filterList)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, filterGroup);
FUNCTION_TEST_PARAM(VARIANT, filterList);
FUNCTION_TEST_END();
ASSERT(filterGroup != NULL);
ASSERT(filterList != NULL);
const VariantList *filterKeyList = kvKeyList(varKv(filterList));
for (unsigned int filterIdx = 0; filterIdx < varLstSize(filterKeyList); filterIdx++)
{
const String *filterKey = varStr(varLstGet(filterKeyList, filterIdx));
const VariantList *filterParam = varVarLst(kvGet(varKv(filterList), varLstGet(filterKeyList, filterIdx)));
if (strEq(filterKey, GZIP_COMPRESS_FILTER_TYPE_STR))
ioFilterGroupAdd(filterGroup, gzipCompressNewVar(filterParam));
else if (strEq(filterKey, GZIP_DECOMPRESS_FILTER_TYPE_STR))
ioFilterGroupAdd(filterGroup, gzipDecompressNewVar(filterParam));
else
THROW_FMT(AssertError, "unable to add filter '%s'", strPtr(filterKey));
}
FUNCTION_TEST_RETURN_VOID();
}
/***********************************************************************************************************************************
Process storage protocol requests
***********************************************************************************************************************************/
@ -87,7 +121,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
// Create the read object
IoRead *fileRead = storageReadIo(
interface.newRead(
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1))));
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1)), false));
// Set filter group based on passed filters
storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 2));
// Check if the file exists
bool exists = ioReadOpen(fileRead);
@ -127,7 +164,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varUIntForce(varLstGet(paramList, 1)),
varUIntForce(varLstGet(paramList, 2)), varStr(varLstGet(paramList, 3)), varStr(varLstGet(paramList, 4)),
(time_t)varIntForce(varLstGet(paramList, 5)), varBool(varLstGet(paramList, 6)),
varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9))));
varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9)), false));
// Set filter group based on passed filters
storageRemoteFilterGroup(ioWriteFilterGroup(fileWrite), varLstGet(paramList, 10));
// Open file
ioWriteOpen(fileWrite);

View File

@ -6,6 +6,8 @@ Remote Storage Read
#include <fcntl.h>
#include <unistd.h>
#include "common/compress/gzip/compress.h"
#include "common/compress/gzip/decompress.h"
#include "common/debug.h"
#include "common/io/read.intern.h"
#include "common/log.h"
@ -24,10 +26,15 @@ typedef struct StorageReadRemote
MemContext *memContext; // Object mem context
StorageReadInterface interface; // Interface
StorageRemote *storage; // Storage that created this object
StorageRead *read; // Storage read interface
ProtocolClient *client; // Protocol client for requests
size_t remaining; // Bytes remaining to be read in block
bool eof; // Has the file reached eof?
#ifdef DEBUG
uint64_t protocolReadBytes; // How many bytes were read from the protocol layer?
#endif
} StorageReadRemote;
/***********************************************************************************************************************************
@ -56,9 +63,25 @@ storageReadRemoteOpen(THIS_VOID)
MEM_CONTEXT_TEMP_BEGIN()
{
IoFilterGroup *filterGroup = ioFilterGroupNew();
// If the file is compressible add compression filter on the remote
if (this->interface.compressible)
ioFilterGroupAdd(filterGroup, gzipCompressNew((int)this->interface.compressLevel, true));
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR);
protocolCommandParamAdd(command, VARSTR(this->interface.name));
protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing));
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
// If the file is compressible add decompression filter locally
if (this->interface.compressible)
{
// Since we can't insert filters yet we'll just error if there are already filters in the list
CHECK(ioFilterGroupSize(ioReadFilterGroup(storageReadIo(this->read))) == 0);
ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), gzipDecompressNew(true));
}
result = varBool(protocolClientExecute(this->client, command, true));
}
@ -100,6 +123,10 @@ storageReadRemote(THIS_VOID, Buffer *buffer, bool block)
if (this->remaining == 0)
this->eof = true;
#ifdef DEBUG
this->protocolReadBytes += this->remaining;
#endif
}
MEM_CONTEXT_TEMP_END();
}
@ -147,30 +174,36 @@ storageReadRemoteEof(THIS_VOID)
New object
***********************************************************************************************************************************/
StorageRead *
storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing)
storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
FUNCTION_LOG_PARAM(STRING, name);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_PARAM(UINT, compressLevel);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
ASSERT(client != NULL);
ASSERT(name != NULL);
StorageRead *this = NULL;
StorageReadRemote *this = NULL;
MEM_CONTEXT_NEW_BEGIN("StorageReadRemote")
{
StorageReadRemote *driver = memNew(sizeof(StorageReadRemote));
driver->memContext = MEM_CONTEXT_NEW();
this = memNew(sizeof(StorageReadRemote));
this->memContext = MEM_CONTEXT_NEW();
driver->interface = (StorageReadInterface)
this->interface = (StorageReadInterface)
{
.type = STORAGE_REMOTE_TYPE_STR,
.name = strDup(name),
.compressible = compressible,
.compressLevel = compressLevel,
.ignoreMissing = ignoreMissing,
.ioInterface = (IoReadInterface)
@ -181,12 +214,13 @@ storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const Strin
},
};
driver->storage = storage;
driver->client = client;
this->storage = storage;
this->client = client;
this = storageReadNew(driver, &driver->interface);
this->read = storageReadNew(this, &this->interface);
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(STORAGE_READ, this);
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, this->read);
}

View File

@ -11,6 +11,8 @@ Remote Storage Read
/***********************************************************************************************************************************
Constructor
***********************************************************************************************************************************/
StorageRead *storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing);
StorageRead *storageReadRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
unsigned int compressLevel);
#endif

View File

@ -24,6 +24,7 @@ struct StorageRemote
{
MemContext *memContext;
ProtocolClient *client; // Protocol client
unsigned int compressLevel; // Protocol compression level
};
/***********************************************************************************************************************************
@ -113,7 +114,7 @@ storageRemoteList(THIS_VOID, const String *path, const String *expression)
New file read object
***********************************************************************************************************************************/
static StorageRead *
storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing)
storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
{
THIS(StorageRemote);
@ -121,11 +122,15 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing)
FUNCTION_LOG_PARAM(STORAGE_REMOTE, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_END();
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadRemoteNew(this, this->client, file, ignoreMissing));
FUNCTION_LOG_RETURN(
STORAGE_READ,
storageReadRemoteNew(
this, this->client, file, ignoreMissing, this->compressLevel > 0 ? compressible : false, this->compressLevel));
}
/***********************************************************************************************************************************
@ -134,7 +139,7 @@ New file write object
static StorageWrite *
storageRemoteNewWrite(
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
bool createPath, bool syncFile, bool syncPath, bool atomic)
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
{
THIS(StorageRemote);
@ -150,6 +155,7 @@ storageRemoteNewWrite(
FUNCTION_LOG_PARAM(BOOL, syncFile);
FUNCTION_LOG_PARAM(BOOL, syncPath);
FUNCTION_LOG_PARAM(BOOL, atomic);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -158,7 +164,8 @@ storageRemoteNewWrite(
FUNCTION_LOG_RETURN(
STORAGE_WRITE,
storageWriteRemoteNew(
this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic));
this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic,
this->compressLevel > 0 ? compressible : false, this->compressLevel));
}
/***********************************************************************************************************************************
@ -319,7 +326,8 @@ New object
***********************************************************************************************************************************/
Storage *
storageRemoteNew(
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client)
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client,
unsigned int compressLevel)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(MODE, modeFile);
@ -327,6 +335,7 @@ storageRemoteNew(
FUNCTION_LOG_PARAM(BOOL, write);
FUNCTION_LOG_PARAM(FUNCTIONP, pathExpressionFunction);
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
FUNCTION_LOG_PARAM(UINT, compressLevel);
FUNCTION_LOG_END();
ASSERT(modeFile != 0);
@ -340,6 +349,7 @@ storageRemoteNew(
StorageRemote *driver = memNew(sizeof(StorageRemote));
driver->memContext = MEM_CONTEXT_NEW();
driver->client = client;
driver->compressLevel = compressLevel;
uint64_t feature = 0;

View File

@ -17,6 +17,7 @@ Storage type
Constructor
***********************************************************************************************************************************/
Storage *storageRemoteNew(
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client);
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client,
unsigned int compressLevel);
#endif

View File

@ -3,6 +3,8 @@ Remote Storage File write
***********************************************************************************************************************************/
#include "build.auto.h"
#include "common/compress/gzip/compress.h"
#include "common/compress/gzip/decompress.h"
#include "common/debug.h"
#include "common/io/write.intern.h"
#include "common/log.h"
@ -23,7 +25,12 @@ typedef struct StorageWriteRemote
MemContext *memContext; // Object mem context
StorageWriteInterface interface; // Interface
StorageRemote *storage; // Storage that created this object
StorageWrite *write; // Storage write interface
ProtocolClient *client; // Protocol client to make requests with
#ifdef DEBUG
uint64_t protocolWriteBytes; // How many bytes were written to the protocol layer?
#endif
} StorageWriteRemote;
/***********************************************************************************************************************************
@ -61,6 +68,12 @@ storageWriteRemoteOpen(THIS_VOID)
MEM_CONTEXT_TEMP_BEGIN()
{
IoFilterGroup *filterGroup = ioFilterGroupNew();
// If the file is compressible add decompression filter on the remote
if (this->interface.compressible)
ioFilterGroupAdd(filterGroup, gzipDecompressNew(true));
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR);
protocolCommandParamAdd(command, VARSTR(this->interface.name));
protocolCommandParamAdd(command, VARUINT(this->interface.modeFile));
@ -72,6 +85,14 @@ storageWriteRemoteOpen(THIS_VOID)
protocolCommandParamAdd(command, VARBOOL(this->interface.syncFile));
protocolCommandParamAdd(command, VARBOOL(this->interface.syncPath));
protocolCommandParamAdd(command, VARBOOL(this->interface.atomic));
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
// If the file is compressible add compression filter locally
if (this->interface.compressible)
{
ioFilterGroupAdd(
ioWriteFilterGroup(storageWriteIo(this->write)), gzipCompressNew((int)this->interface.compressLevel, true));
}
protocolClientExecute(this->client, command, false);
@ -103,6 +124,10 @@ storageWriteRemote(THIS_VOID, const Buffer *buffer)
ioWrite(protocolClientIoWrite(this->client), buffer);
ioWriteFlush(protocolClientIoWrite(this->client));
#ifdef DEBUG
this->protocolWriteBytes += bufUsed(buffer);
#endif
FUNCTION_LOG_RETURN_VOID();
}
@ -140,7 +165,8 @@ Create a new file
StorageWrite *
storageWriteRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user,
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic)
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible,
unsigned int compressLevel)
{
FUNCTION_LOG_BEGIN(logLevelTrace);
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
@ -154,6 +180,8 @@ storageWriteRemoteNew(
FUNCTION_LOG_PARAM(BOOL, syncFile);
FUNCTION_LOG_PARAM(BOOL, syncPath);
FUNCTION_LOG_PARAM(BOOL, atomic);
FUNCTION_LOG_PARAM(BOOL, compressible);
FUNCTION_LOG_PARAM(UINT, compressLevel);
FUNCTION_LOG_END();
ASSERT(storage != NULL);
@ -162,18 +190,20 @@ storageWriteRemoteNew(
ASSERT(modeFile != 0);
ASSERT(modePath != 0);
StorageWrite *this = NULL;
StorageWriteRemote *this = NULL;
MEM_CONTEXT_NEW_BEGIN("StorageWriteRemote")
{
StorageWriteRemote *driver = memNew(sizeof(StorageWriteRemote));
driver->memContext = MEM_CONTEXT_NEW();
this = memNew(sizeof(StorageWriteRemote));
this->memContext = MEM_CONTEXT_NEW();
driver->interface = (StorageWriteInterface)
this->interface = (StorageWriteInterface)
{
.type = STORAGE_REMOTE_TYPE_STR,
.name = strDup(name),
.atomic = atomic,
.compressible = compressible,
.compressLevel = compressLevel,
.createPath = createPath,
.group = strDup(group),
.modeFile = modeFile,
@ -191,12 +221,13 @@ storageWriteRemoteNew(
},
};
driver->storage = storage;
driver->client = client;
this->storage = storage;
this->client = client;
this = storageWriteNew(driver, &driver->interface);
this->write = storageWriteNew(this, &this->interface);
}
MEM_CONTEXT_NEW_END();
FUNCTION_LOG_RETURN(STORAGE_WRITE, this);
ASSERT(this != NULL);
FUNCTION_LOG_RETURN(STORAGE_WRITE, this->write);
}

View File

@ -13,6 +13,7 @@ Constructor
***********************************************************************************************************************************/
StorageWrite *storageWriteRemoteNew(
StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user,
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic);
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible,
unsigned int compressLevel);
#endif

View File

@ -657,7 +657,7 @@ storageS3List(THIS_VOID, const String *path, const String *expression)
New file read object
***********************************************************************************************************************************/
static StorageRead *
storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing)
storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
{
THIS(StorageS3);
@ -665,6 +665,7 @@ storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing)
FUNCTION_LOG_PARAM(STORAGE_S3, this);
FUNCTION_LOG_PARAM(STRING, file);
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
(void)compressible;
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -679,7 +680,7 @@ New file write object
static StorageWrite *
storageS3NewWrite(
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
bool createPath, bool syncFile, bool syncPath, bool atomic)
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
{
THIS(StorageS3);
@ -695,6 +696,7 @@ storageS3NewWrite(
FUNCTION_LOG_PARAM(BOOL, syncFile);
FUNCTION_LOG_PARAM(BOOL, syncPath);
FUNCTION_LOG_PARAM(BOOL, atomic);
(void)compressible;
FUNCTION_LOG_END();
ASSERT(this != NULL);

View File

@ -407,7 +407,7 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
FUNCTION_LOG_PARAM(STORAGE, this);
FUNCTION_LOG_PARAM(STRING, fileExp);
FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -416,12 +416,9 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
MEM_CONTEXT_TEMP_BEGIN()
{
result = this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing);
if (param.filterGroup != NULL)
ioReadFilterGroupSet(storageReadIo(result), param.filterGroup);
storageReadMove(result, MEM_CONTEXT_OLD());
result = storageReadMove(
this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing, param.compressible),
MEM_CONTEXT_OLD());
}
MEM_CONTEXT_TEMP_END();
@ -446,7 +443,7 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam
FUNCTION_LOG_PARAM(BOOL, param.noSyncFile);
FUNCTION_LOG_PARAM(BOOL, param.noSyncPath);
FUNCTION_LOG_PARAM(BOOL, param.noAtomic);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup);
FUNCTION_LOG_PARAM(BOOL, param.compressible);
FUNCTION_LOG_END();
ASSERT(this != NULL);
@ -456,15 +453,12 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam
MEM_CONTEXT_TEMP_BEGIN()
{
result = this->interface.newWrite(
result = storageWriteMove(
this->interface.newWrite(
this->driver, storagePathNP(this, fileExp), param.modeFile != 0 ? param.modeFile : this->modeFile,
param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified, !param.noCreatePath,
!param.noSyncFile, !param.noSyncPath, !param.noAtomic);
if (param.filterGroup != NULL)
ioWriteFilterGroupSet(storageWriteIo(result), param.filterGroup);
storageWriteMove(result, MEM_CONTEXT_OLD());
param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified,
!param.noCreatePath, !param.noSyncFile, !param.noSyncPath, !param.noAtomic, param.compressible),
MEM_CONTEXT_OLD());
}
MEM_CONTEXT_TEMP_END();

View File

@ -146,7 +146,7 @@ storageNewRead
typedef struct StorageNewReadParam
{
bool ignoreMissing;
IoFilterGroup *filterGroup;
bool compressible;
} StorageNewReadParam;
#define storageNewReadP(this, pathExp, ...) \
@ -170,7 +170,7 @@ typedef struct StorageNewWriteParam
bool noSyncFile;
bool noSyncPath;
bool noAtomic;
IoFilterGroup *filterGroup;
bool compressible;
} StorageNewWriteParam;
#define storageNewWriteP(this, pathExp, ...) \

View File

@ -57,15 +57,16 @@ typedef struct StorageInterface
// Features implemented by the storage driver
uint64_t feature;
bool (*copy)(StorageRead *source, StorageWrite *destination);
bool (*exists)(void *driver, const String *file);
StorageInfo (*info)(void *driver, const String *path, bool followLink);
bool (*infoList)(void *driver, const String *file, StorageInfoListCallback callback, void *callbackData);
StringList *(*list)(void *driver, const String *path, const String *expression);
bool (*move)(void *driver, StorageRead *source, StorageWrite *destination);
StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing);
StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing, bool compressible);
StorageWrite *(*newWrite)(
void *driver, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group,
time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic);
time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible);
void (*pathCreate)(void *driver, const String *path, bool errorOnExists, bool noParentCreate, mode_t mode);
bool (*pathExists)(void *driver, const String *path);
bool (*pathRemove)(void *driver, const String *path, bool recurse);

View File

@ -23,6 +23,8 @@ typedef struct StorageWriteInterface
bool atomic;
bool createPath;
bool compressible; // Is this file compressible?
unsigned int compressLevel; // Level to use for compression
const String *group; // Group that owns the file
mode_t modeFile;
mode_t modePath;

View File

@ -534,6 +534,8 @@ unit:
storage/remote/storage: full
storage/remote/write: full
storage/helper: full
storage/read: full
storage/write: full
storage/storage: full
# ----------------------------------------------------------------------------------------------------------------------------

View File

@ -186,10 +186,9 @@ testRun(void)
// -------------------------------------------------------------------------------------------------------------------------
StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test1/archive.info"));
ioWriteFilterGroupSet(
storageWriteIo(infoWrite),
ioFilterGroupAdd(
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL)));
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
BUFSTRDEF("12345678"), NULL));
storagePutNP(
infoWrite,
@ -208,11 +207,10 @@ testRun(void)
strNew(
"repo/archive/test1/10-1/01ABCDEF01ABCDEF/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.gz"));
IoFilterGroup *filterGroup = ioFilterGroupNew();
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(destination));
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
ioFilterGroupAdd(
filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("worstpassphraseever"), NULL));
ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup);
storagePutNP(destination, buffer);
TEST_RESULT_INT(

View File

@ -373,10 +373,9 @@ testRun(void)
StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test/archive.info"));
ioWriteFilterGroupSet(
storageWriteIo(infoWrite),
ioFilterGroupAdd(
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpassphrase"), NULL)));
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
BUFSTRDEF("badpassphrase"), NULL));
storagePutNP(
infoWrite,

View File

@ -139,9 +139,8 @@ testRun(void)
bufUsedSet(buffer, bufSize(buffer));
memset(bufPtr(buffer), 0, bufSize(buffer));
IoWrite *write = ioWriteFilterGroupSet(
ioBufferWriteNew(bufferOut),
ioFilterGroupAdd(ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0)));
IoWrite *write = ioBufferWriteNew(bufferOut);
ioFilterGroupAdd(ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0));
ioWriteOpen(write);
ioWrite(write, buffer);
ioWriteClose(write);
@ -161,10 +160,9 @@ testRun(void)
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.walid = 0xF0F0F0F0;
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.xrecoff = 0xF0F0F0F0;
write = ioWriteFilterGroupSet(
ioBufferWriteNew(bufferOut),
write = ioBufferWriteNew(bufferOut);
ioFilterGroupAdd(
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
ioWriteOpen(write);
ioWrite(write, buffer);
ioWriteClose(write);
@ -209,10 +207,9 @@ testRun(void)
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_upper = 0x01;
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_lsn.xrecoff = 0x7;
write = ioWriteFilterGroupSet(
ioBufferWriteNew(bufferOut),
write = ioBufferWriteNew(bufferOut);
ioFilterGroupAdd(
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
ioWriteOpen(write);
ioWrite(write, buffer);
ioWriteClose(write);
@ -227,10 +224,9 @@ testRun(void)
bufUsedSet(buffer, bufSize(buffer));
memset(bufPtr(buffer), 0, bufSize(buffer));
write = ioWriteFilterGroupSet(
ioBufferWriteNew(bufferOut),
write = ioBufferWriteNew(bufferOut);
ioFilterGroupAdd(
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
ioWriteOpen(write);
ioWrite(write, buffer);
ioWriteClose(write);
@ -245,10 +241,9 @@ testRun(void)
bufUsedSet(buffer, bufSize(buffer));
memset(bufPtr(buffer), 0, bufSize(buffer));
write = ioWriteFilterGroupSet(
ioBufferWriteNew(bufferOut),
write = ioBufferWriteNew(bufferOut);
ioFilterGroupAdd(
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
ioWriteOpen(write);
ioWrite(write, buffer);
TEST_ERROR(ioWrite(write, buffer), AssertError, "should not be possible to see two misaligned pages in a row");

View File

@ -65,10 +65,9 @@ testRun(void)
// Create a compressed encrypted repo file
StorageWrite *ceRepoFile = storageNewWriteNP(
storageRepoWrite(), strNewFmt(STORAGE_REPO_BACKUP "/%s/%s.gz", strPtr(repoFileReferenceFull), strPtr(repoFile1)));
IoFilterGroup *filterGroup = ioFilterGroupNew();
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(ceRepoFile));
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpass"), NULL));
ioWriteFilterGroupSet(storageWriteIo(ceRepoFile), filterGroup);
storagePutNP(ceRepoFile, BUFSTRDEF("acefile"));

View File

@ -16,10 +16,8 @@ testCompress(IoFilter *compress, Buffer *decompressed, size_t inputSize, size_t
size_t inputTotal = 0;
ioBufferSizeSet(outputSize);
IoFilterGroup *filterGroup = ioFilterGroupNew();
ioFilterGroupAdd(filterGroup, compress);
IoWrite *write = ioBufferWriteNew(compressed);
ioWriteFilterGroupSet(write, filterGroup);
ioFilterGroupAdd(ioWriteFilterGroup(write), compress);
ioWriteOpen(write);
// Compress input data
@ -52,10 +50,8 @@ testDecompress(IoFilter *decompress, Buffer *compressed, size_t inputSize, size_
Buffer *output = bufNew(outputSize);
ioBufferSizeSet(inputSize);
IoFilterGroup *filterGroup = ioFilterGroupNew();
ioFilterGroupAdd(filterGroup, decompress);
IoRead *read = ioBufferReadNew(compressed);
ioReadFilterGroupSet(read, filterGroup);
ioFilterGroupAdd(ioReadFilterGroup(read), decompress);
ioReadOpen(read);
while (!ioReadEof(read))
@ -110,8 +106,15 @@ testRun(void)
Buffer *compressed = NULL;
Buffer *decompressed = bufNewC(simpleData, strlen(simpleData));
VariantList *compressParamList = varLstNew();
varLstAdd(compressParamList, varNewUInt(3));
varLstAdd(compressParamList, varNewBool(false));
VariantList *decompressParamList = varLstNew();
varLstAdd(decompressParamList, varNewBool(false));
TEST_ASSIGN(
compressed, testCompress(gzipCompressNew(3, false), decompressed, 1024, 1024),
compressed, testCompress(gzipCompressNewVar(compressParamList), decompressed, 1024, 1024),
"simple data - compress large in/large out buffer");
TEST_RESULT_BOOL(
@ -127,7 +130,7 @@ testRun(void)
"simple data - compress small in/small out buffer");
TEST_RESULT_BOOL(
bufEq(decompressed, testDecompress(gzipDecompressNew(false), compressed, 1024, 1024)), true,
bufEq(decompressed, testDecompress(gzipDecompressNewVar(decompressParamList), compressed, 1024, 1024)), true,
"simple data - decompress large in/large out buffer");
TEST_RESULT_BOOL(

View File

@ -3,6 +3,8 @@ Test IO
***********************************************************************************************************************************/
#include <fcntl.h>
#include "common/type/json.h"
#include "common/harnessFork.h"
/***********************************************************************************************************************************
@ -111,7 +113,7 @@ ioTestFilterSizeNew(const char *type)
IoTestFilterSize *driver = memNew(sizeof(IoTestFilterSize));
driver->memContext = MEM_CONTEXT_NEW();
this = ioFilterNewP(strNew(type), driver, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult);
this = ioFilterNewP(strNew(type), driver, NULL, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult);
}
MEM_CONTEXT_NEW_END();
@ -216,8 +218,13 @@ ioTestFilterMultiplyNew(const char *type, unsigned int multiplier, unsigned int
driver->flushTotal = flushTotal;
driver->flushChar = flushChar;
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewStrZ(type));
varLstAdd(paramList, varNewUInt(multiplier));
varLstAdd(paramList, varNewUInt(flushTotal));
this = ioFilterNewP(
strNew(type), driver, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess,
strNew(type), driver, paramList, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess,
.inputSame = ioTestFilterMultiplyInputSame);
}
MEM_CONTEXT_NEW_END();
@ -285,18 +292,19 @@ testRun(void)
bufferOriginal = bufNewC("123", 3);
TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object");
IoFilterGroup *filterGroup = NULL;
TEST_RESULT_VOID(ioFilterGroupMove(filterGroup, memContextTop()), "move null filter group is a noop");
TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group");
IoFilter *sizeFilter = ioSizeNew();
TEST_RESULT_PTR(ioFilterGroupAdd(filterGroup, sizeFilter), filterGroup, " add filter to filter group");
TEST_RESULT_PTR(
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), sizeFilter), bufferRead->filterGroup, " add filter to filter group");
TEST_RESULT_VOID(
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 3, 'X')), " add filter to filter group");
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioSizeNew()), " add filter to filter group");
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 3, 'X')),
" add filter to filter group");
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioSizeNew()), " add filter to filter group");
IoFilter *bufferFilter = ioBufferNew();
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, bufferFilter), " add filter to filter group");
TEST_RESULT_VOID(ioReadFilterGroupSet(bufferRead, filterGroup), " add filter group to read io");
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), bufferFilter), " add filter to filter group");
TEST_RESULT_PTR(ioFilterMove(NULL, memContextTop()), NULL, " move NULL filter to top context");
TEST_RESULT_STR(
strPtr(jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(bufferRead)), 0)),
"{\"buffer\":null,\"double\":[\"double\",2,3],\"size\":null}", " check filter params");
TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open");
TEST_RESULT_INT(ioReadHandle(bufferRead), -1, " handle invalid");
@ -322,21 +330,26 @@ testRun(void)
TEST_RESULT_BOOL(ioBufferRead(ioReadDriver(bufferRead), buffer, true), 0, " eof from driver");
TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 0, " read 0 bytes");
TEST_RESULT_VOID(ioReadClose(bufferRead), " close buffer read object");
TEST_RESULT_STR(
strPtr(jsonFromVar(ioFilterGroupResultAll(ioReadFilterGroup(bufferRead)), 0)),
"{\"buffer\":null,\"double\":null,\"size\":[3,9]}",
" check filter result all");
TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), filterGroup, " check filter group");
TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), ioReadFilterGroup(bufferRead), " check filter group");
TEST_RESULT_UINT(
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 0)), 3,
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 0)), 3,
" check filter result");
TEST_RESULT_PTR(ioFilterGroupResult(filterGroup, strNew("double")), NULL, " check filter result is NULL");
TEST_RESULT_PTR(
ioFilterGroupResult(ioReadFilterGroup(bufferRead), strNew("double")), NULL, " check filter result is NULL");
TEST_RESULT_UINT(
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 9,
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 1)), 9,
" check filter result");
TEST_RESULT_PTR(ioFilterDriver(bufferFilter), bufferFilter->driver, " check filter driver");
TEST_RESULT_PTR(ioFilterInterface(bufferFilter), &bufferFilter->interface, " check filter interface");
TEST_RESULT_VOID(ioFilterFree(bufferFilter), " free buffer filter");
TEST_RESULT_VOID(ioFilterGroupFree(filterGroup), " free filter group object");
TEST_RESULT_VOID(ioFilterGroupFree(ioReadFilterGroup(bufferRead)), " free filter group object");
// Read a zero-size buffer to ensure filters are still processed even when there is no input. Some filters (e.g. encryption
// and compression) will produce output even if there is no input.
@ -347,7 +360,7 @@ testRun(void)
TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object");
TEST_RESULT_VOID(
ioReadFilterGroupSet(bufferRead, ioFilterGroupAdd(ioFilterGroupNew(), ioTestFilterMultiplyNew("double", 2, 5, 'Y'))),
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 5, 'Y')),
" add filter that produces output with no input");
TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open read");
TEST_RESULT_UINT(ioRead(bufferRead, buffer), 5, " read 5 chars");
@ -437,8 +450,7 @@ testRun(void)
Buffer *buffer = bufNew(0);
TEST_ASSIGN(bufferWrite, ioBufferWriteNew(buffer), "create buffer write object");
IoFilterGroup *filterGroup = NULL;
TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group");
IoFilterGroup *filterGroup = ioWriteFilterGroup(bufferWrite);
IoFilter *sizeFilter = ioSizeNew();
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, sizeFilter), " add filter to filter group");
TEST_RESULT_VOID(
@ -447,7 +459,6 @@ testRun(void)
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("single", 1, 1, 'Y')),
" add filter to filter group");
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioTestFilterSizeNew("size2")), " add filter to filter group");
TEST_RESULT_VOID(ioWriteFilterGroupSet(bufferWrite, filterGroup), " add filter group to write io");
TEST_RESULT_VOID(ioWriteOpen(bufferWrite), " open buffer write object");
TEST_RESULT_INT(ioWriteHandle(bufferWrite), -1, " handle invalid");

View File

@ -62,10 +62,9 @@ testRun(void)
//--------------------------------------------------------------------------------------------------------------------------
StorageWrite *infoWrite = storageNewWriteNP(storageLocalWrite(), fileName);
ioWriteFilterGroupSet(
storageWriteIo(infoWrite),
ioFilterGroupAdd(
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL)));
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
BUFSTRDEF("12345678"), NULL));
storageRemoveNP(storageLocalWrite(), fileNameCopy);
storagePutNP(

View File

@ -604,11 +604,6 @@ testRun(void)
TEST_RESULT_INT(
ioReadHandle(storageReadIo(file)), ((StorageReadPosix *)file->driver)->handle, "check read handle");
TEST_RESULT_VOID(ioReadClose(storageReadIo(file)), " close file");
// -------------------------------------------------------------------------------------------------------------------------
IoFilterGroup *filterGroup = ioFilterGroupNew();
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName, .filterGroup = filterGroup), "new read file with filters");
TEST_RESULT_PTR(ioReadFilterGroup(storageReadIo(file)), filterGroup, " check filter group is set");
}
// *****************************************************************************************************************************
@ -670,13 +665,6 @@ testRun(void)
TEST_RESULT_VOID(storageWritePosixClose(storageWriteDriver(file)), " close file again");
TEST_RESULT_INT(storageInfoNP(storageTest, strPath(fileName)).mode, 0700, " check path mode");
TEST_RESULT_INT(storageInfoNP(storageTest, fileName).mode, 0600, " check file mode");
// -------------------------------------------------------------------------------------------------------------------------
IoFilterGroup *filterGroup = ioFilterGroupNew();
TEST_ASSIGN(file, storageNewWriteP(storageTest, fileName, .filterGroup = filterGroup), "new write file with filters");
TEST_RESULT_VOID(ioWriteOpen(storageWriteIo(file)), " open file");
TEST_RESULT_VOID(ioWriteClose(storageWriteIo(file)), " close file");
TEST_RESULT_PTR(ioWriteFilterGroup(storageWriteIo(file)), filterGroup, " check filter group is set");
}
// *****************************************************************************************************************************

View File

@ -1,6 +1,7 @@
/***********************************************************************************************************************************
Test Remote Storage
***********************************************************************************************************************************/
#include "common/crypto/cipherBlock.h"
#include "common/io/bufferRead.h"
#include "common/io/bufferWrite.h"
@ -139,6 +140,9 @@ testRun(void)
storagePutNP(storageNewWriteNP(storageTest, strNew("repo/test.txt")), contentBuf);
// Disable protocol compression in the storage object to test no compression
((StorageRemote *)storageRemote->driver)->compressLevel = 0;
StorageRead *fileRead = NULL;
ioBufferSizeSet(8193);
@ -150,8 +154,20 @@ testRun(void)
storageReadRemote(storageRead(fileRead), bufNew(32), false), 0,
"nothing more to read");
TEST_ASSIGN(fileRead, storageNewReadNP(storageRemote, strNew("test.txt")), "get file");
TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents");
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), " check read size");
// Enable protocol compression in the storage object
((StorageRemote *)storageRemote->driver)->compressLevel = 3;
TEST_ASSIGN(
fileRead, storageNewReadP(storageRemote, strNew("test.txt"), .compressible = true), "get file (protocol compress)");
TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents");
// We don't know how much protocol compression there will be exactly, but make sure this is some
TEST_RESULT_BOOL(
bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "get file again");
((StorageReadRemote *)fileRead->driver)->protocolReadBytes < bufSize(contentBuf), true,
" check compressed read size");
TEST_ERROR(
storageRemoteProtocolBlockSize(strNew("bogus")), ProtocolError, "'bogus' is not a valid block size message");
@ -161,6 +177,7 @@ testRun(void)
VariantList *paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNew("missing.txt")));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewKv(kvNew()));
TEST_RESULT_BOOL(
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true,
@ -178,6 +195,12 @@ testRun(void)
varLstAdd(paramList, varNewStr(strNew("test.txt")));
varLstAdd(paramList, varNewBool(false));
// Create filters to test filter logic
IoFilterGroup *filterGroup = ioFilterGroupNew();
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
TEST_RESULT_BOOL(
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true, "protocol open read");
TEST_RESULT_STR(
@ -190,6 +213,21 @@ testRun(void)
bufUsedSet(serverWrite, 0);
ioBufferSizeSet(8192);
// Check for error on a bogus filter
// -------------------------------------------------------------------------------------------------------------------------
paramList = varLstNew();
varLstAdd(paramList, varNewStr(strNew("test.txt")));
varLstAdd(paramList, varNewBool(false));
// Create filters to test filter logic
filterGroup = ioFilterGroupNew();
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("X"), NULL));
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
TEST_ERROR(
storageRemoteProtocol(
PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), AssertError, "unable to add filter 'cipherBlock'");
}
// *****************************************************************************************************************************
@ -213,6 +251,9 @@ testRun(void)
// -------------------------------------------------------------------------------------------------------------------------
ioBufferSizeSet(9999);
// Disable protocol compression in the storage object to test no compression
((StorageRemote *)storageRemote->driver)->compressLevel = 0;
StorageWrite *write = NULL;
TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test.txt")), "new write file");
@ -225,6 +266,7 @@ testRun(void)
TEST_RESULT_BOOL(storageWriteSyncPath(write), true, "path is synced");
TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file");
TEST_RESULT_UINT(((StorageWriteRemote *)write->driver)->protocolWriteBytes, bufSize(contentBuf), " check write size");
TEST_RESULT_VOID(storageWriteRemoteClose((StorageWriteRemote *)storageWriteDriver(write)), "close file again");
TEST_RESULT_VOID(storageWriteFree(write), "free file");
@ -232,6 +274,9 @@ testRun(void)
TEST_RESULT_BOOL(
bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "check file");
// Enable protocol compression in the storage object
((StorageRemote *)storageRemote->driver)->compressLevel = 3;
// Write the file again, but this time free it before close and make sure the .tmp file is left
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test2.txt")), "new write file");
@ -244,6 +289,14 @@ testRun(void)
TEST_RESULT_UINT(
storageInfoNP(storageTest, strNew("repo/test2.txt.pgbackrest.tmp")).size, 16384, "file exists and is partial");
// Write the file again with protocol compression
// -------------------------------------------------------------------------------------------------------------------------
TEST_ASSIGN(write, storageNewWriteP(storageRemote, strNew("test2.txt"), .compressible = true), "new write file (compress)");
TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file");
TEST_RESULT_BOOL(
((StorageWriteRemote *)write->driver)->protocolWriteBytes < bufSize(contentBuf), true,
" check compressed write size");
// Check protocol function directly (complete write)
// -------------------------------------------------------------------------------------------------------------------------
ioBufferSizeSet(10);
@ -259,6 +312,7 @@ testRun(void)
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewKv(kvNew()));
// Generate input (includes the input for the test below -- need a way to reset this for better testing)
bufCat(
@ -299,6 +353,7 @@ testRun(void)
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewBool(true));
varLstAdd(paramList, varNewKv(kvNew()));
TEST_RESULT_BOOL(
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR, paramList, server), true, "protocol open write");