From 039e515a319216035187c89efccf97143d4cac03 Mon Sep 17 00:00:00 2001 From: David Steele Date: Mon, 24 Jun 2019 10:20:47 -0400 Subject: [PATCH] Allow protocol compression when read/writing remote files. If the file is compressible (i.e. not encrypted or already compressed) it can be marked as such in storageNewRead()/storageNewWrite(). If the file is being read from/written to a remote it will be compressed in transit using gzip. Simplify filter group handling by having the IoRead/IoWrite objects create the filter group automatically. This removes the need for a lot of NULL checking and has a negligible effect on performance since a filter group needs to be created eventually unless the source file is missing. Allow filters to be created using a VariantList so filter parameters can be passed to the remote. --- src/Makefile.in | 24 ++--- src/command/archive/get/file.c | 22 ++-- src/command/archive/push/file.c | 26 +++-- src/command/backup/pageChecksum.c | 2 +- src/command/restore/file.c | 28 ++++-- src/common/compress/gzip/compress.c | 16 ++- src/common/compress/gzip/compress.h | 7 ++ src/common/compress/gzip/decompress.c | 15 ++- src/common/compress/gzip/decompress.h | 7 ++ src/common/crypto/cipherBlock.c | 2 +- src/common/crypto/hash.c | 2 +- src/common/io/filter/buffer.c | 2 +- src/common/io/filter/filter.c | 22 +++- src/common/io/filter/filter.intern.h | 8 +- src/common/io/filter/group.c | 112 +++++++++++++++------ src/common/io/filter/group.h | 5 +- src/common/io/filter/size.c | 2 +- src/common/io/read.c | 37 +------ src/common/io/read.h | 1 - src/common/io/write.c | 40 ++------ src/common/io/write.h | 1 - src/info/info.c | 15 ++- src/storage/helper.c | 2 +- src/storage/posix/storage.c | 6 +- src/storage/read.intern.h | 2 + src/storage/remote/protocol.c | 44 +++++++- src/storage/remote/read.c | 52 ++++++++-- src/storage/remote/read.h | 4 +- src/storage/remote/storage.c | 20 +++- src/storage/remote/storage.h | 3 +- src/storage/remote/write.c | 49 +++++++-- src/storage/remote/write.h | 3 +- src/storage/s3/storage.c | 6 +- src/storage/storage.c | 28 ++---- src/storage/storage.h | 4 +- src/storage/storage.intern.h | 5 +- src/storage/write.intern.h | 2 + test/define.yaml | 2 + test/src/module/command/archiveGetTest.c | 10 +- test/src/module/command/archivePushTest.c | 7 +- test/src/module/command/backupCommonTest.c | 33 +++--- test/src/module/command/restoreTest.c | 3 +- test/src/module/common/compressGzipTest.c | 19 ++-- test/src/module/common/ioTest.c | 49 +++++---- test/src/module/info/infoTest.c | 7 +- test/src/module/storage/posixTest.c | 12 --- test/src/module/storage/remoteTest.c | 57 ++++++++++- 47 files changed, 529 insertions(+), 296 deletions(-) diff --git a/src/Makefile.in b/src/Makefile.in index a5b05b28d..c14c40350 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -216,7 +216,7 @@ command/archive/push/push.o: command/archive/push/push.c build.auto.h command/ar command/backup/common.o: command/backup/common.c build.auto.h command/backup/common.h common/assert.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/string.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/common.c -o command/backup/common.o -command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h +command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/pageChecksum.c -o command/backup/pageChecksum.o command/command.o: command/command.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/tls/client.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h version.h @@ -252,19 +252,19 @@ command/storage/list.o: command/storage/list.c build.auto.h common/assert.h comm common/compress/gzip/common.o: common/compress/gzip/common.c build.auto.h common/assert.h common/compress/gzip/common.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/convert.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/common.c -o common/compress/gzip/common.o -common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/compress.c -o common/compress/gzip/compress.o -common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/decompress.c -o common/compress/gzip/decompress.o -common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/cipherBlock.c -o common/crypto/cipherBlock.o common/crypto/common.o: common/crypto/common.c build.auto.h common/assert.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/stackTrace.h common/type/convert.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/common.c -o common/crypto/common.o -common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/hash.c -o common/crypto/hash.o common/debug.o: common/debug.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/stackTrace.h common/type/convert.h @@ -297,16 +297,16 @@ common/io/bufferRead.o: common/io/bufferRead.c build.auto.h common/assert.h comm common/io/bufferWrite.o: common/io/bufferWrite.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/bufferWrite.h common/io/filter/filter.h common/io/filter/group.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/bufferWrite.c -o common/io/bufferWrite.o -common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/buffer.c -o common/io/filter/buffer.o -common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/filter.c -o common/io/filter/filter.o -common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/variant.h common/type/variantList.h +common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/group.c -o common/io/filter/group.o -common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h +common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/size.c -o common/io/filter/size.o common/io/handleRead.o: common/io/handleRead.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h @@ -510,16 +510,16 @@ storage/posix/write.o: storage/posix/write.c build.auto.h common/assert.h common storage/read.o: storage/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h storage/read.h storage/read.intern.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/read.c -o storage/read.o -storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h +storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/protocol.c -o storage/remote/protocol.o -storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h +storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/read.c -o storage/remote/read.o storage/remote/storage.o: storage/remote/storage.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/storage.c -o storage/remote/storage.o -storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h +storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h $(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/write.c -o storage/remote/write.o storage/s3/read.o: storage/s3/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h diff --git a/src/command/archive/get/file.c b/src/command/archive/get/file.c index 946e073ba..51d26c2c0 100644 --- a/src/command/archive/get/file.c +++ b/src/command/archive/get/file.c @@ -132,6 +132,9 @@ archiveGetFile( // By default result indicates WAL segment not found int result = 1; + // Is the file compressible during the copy? + bool compressible = true; + // Test for stop file lockStopTest(); @@ -146,26 +149,27 @@ archiveGetFile( storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable, .noAtomic = !durable); - // Add filters - IoFilterGroup *filterGroup = ioFilterGroupNew(); - // If there is a cipher then add the decrypt filter if (cipherType != cipherTypeNone) { ioFilterGroupAdd( - filterGroup, cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(archiveGetCheckResult.cipherPass), NULL)); + ioWriteFilterGroup(storageWriteIo(destination)), cipherBlockNew(cipherModeDecrypt, cipherType, + BUFSTR(archiveGetCheckResult.cipherPass), NULL)); + compressible = false; } // If file is compressed then add the decompression filter if (strEndsWithZ(archiveGetCheckResult.archiveFileActual, "." GZIP_EXT)) - ioFilterGroupAdd(filterGroup, gzipDecompressNew(false)); - - ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup); + { + ioFilterGroupAdd(ioWriteFilterGroup(storageWriteIo(destination)), gzipDecompressNew(false)); + compressible = false; + } // Copy the file storageCopyNP( - storageNewReadNP( - storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual))), + storageNewReadP( + storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual)), + .compressible = compressible), destination); // The WAL file was found diff --git a/src/command/archive/push/file.c b/src/command/archive/push/file.c index a32f42304..27cd8f76b 100644 --- a/src/command/archive/push/file.c +++ b/src/command/archive/push/file.c @@ -74,8 +74,7 @@ archivePushFile( { // Generate a sha1 checksum for the wal segment. ??? Probably need a function in storage for this. IoRead *read = storageReadIo(storageNewReadNP(storageLocal(), walSource)); - IoFilterGroup *filterGroup = ioFilterGroupAdd(ioFilterGroupNew(), cryptoHashNew(HASH_TYPE_SHA1_STR)); - ioReadFilterGroupSet(read, filterGroup); + ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR)); Buffer *buffer = bufNew(ioBufferSize()); ioReadOpen(read); @@ -88,7 +87,7 @@ archivePushFile( while (!ioReadEof(read)); ioReadClose(read); - const String *walSegmentChecksum = varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR)); + const String *walSegmentChecksum = varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR)); // If the wal segment already exists in the repo then compare checksums walSegmentFile = walSegmentFind(storageRepo(), archiveId, archiveFile); @@ -119,27 +118,32 @@ archivePushFile( { StorageRead *source = storageNewReadNP(storageLocal(), walSource); - // Add filters - IoFilterGroup *filterGroup = ioFilterGroupNew(); + // Is the file compressible during the copy? + bool compressible = true; // If the file will be compressed then add compression filter if (isSegment && compress) { strCat(archiveDestination, "." GZIP_EXT); - ioFilterGroupAdd(filterGroup, gzipCompressNew(compressLevel, false)); + ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(source)), gzipCompressNew(compressLevel, false)); + compressible = false; } // If there is a cipher then add the encrypt filter if (cipherType != cipherTypeNone) - ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL)); - - ioReadFilterGroupSet(storageReadIo(source), filterGroup); + { + ioFilterGroupAdd( + ioReadFilterGroup(storageReadIo(source)), + cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL)); + compressible = false; + } // Copy the file storageCopyNP( source, - storageNewWriteNP( - storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination)))); + storageNewWriteP( + storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination)), + .compressible = compressible)); } } MEM_CONTEXT_TEMP_END(); diff --git a/src/command/backup/pageChecksum.c b/src/command/backup/pageChecksum.c index eb457eceb..7d1d12e0d 100644 --- a/src/command/backup/pageChecksum.c +++ b/src/command/backup/pageChecksum.c @@ -229,7 +229,7 @@ pageChecksumNew(unsigned int segmentNo, unsigned int segmentPageTotal, size_t pa driver->valid = true; driver->align = true; - this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, .in = pageChecksumProcess, .result = pageChecksumResult); + this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, NULL, .in = pageChecksumProcess, .result = pageChecksumResult); } MEM_CONTEXT_NEW_END(); diff --git a/src/command/restore/file.c b/src/command/restore/file.c index eed4a66a6..cd1fb547c 100644 --- a/src/command/restore/file.c +++ b/src/command/restore/file.c @@ -55,7 +55,8 @@ restoreFile( // Was the file copied? bool result = true; - // Create destination file. We may not use this but it makes sense to only create it in one place if we do. + // Is the file compressible during the copy? + bool compressible = true; MEM_CONTEXT_TEMP_BEGIN() { @@ -81,13 +82,12 @@ restoreFile( if (info.size == pgFileSize) { // Generate checksum for the file if size is not zero - IoFilterGroup *filterGroup = ioFilterGroupNew(); + IoRead *read = NULL; if (info.size != 0) { - IoRead *read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile)); - ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR)); - ioReadFilterGroupSet(read, filterGroup); + read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile)); + ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR)); Buffer *buffer = bufNew(ioBufferSize()); ioReadOpen(read); @@ -104,7 +104,8 @@ restoreFile( // If size and checksum are equal then no need to copy the file if (pgFileSize == 0 || - strEq(pgFileChecksum, varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR)))) + strEq( + pgFileChecksum, varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR)))) { // Even if hash/size are the same set the time back to backup time. This helps with unit testing, but // also presents a pristine version of the database after restore. @@ -153,15 +154,21 @@ restoreFile( // Else perform the copy else { - IoFilterGroup *filterGroup = ioFilterGroupNew(); + IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(pgFileWrite)); // Add decryption filter if (cipherPass != NULL) + { ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTR(cipherPass), NULL)); + compressible = false; + } // Add decompression filter if (repoFileCompressed) + { ioFilterGroupAdd(filterGroup, gzipDecompressNew(false)); + compressible = false; + } // Add sha1 filter ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR)); @@ -169,15 +176,14 @@ restoreFile( // Add size filter ioFilterGroupAdd(filterGroup, ioSizeNew()); - ioWriteFilterGroupSet(storageWriteIo(pgFileWrite), filterGroup); - // Copy file storageCopyNP( - storageNewReadNP( + storageNewReadP( storageRepo(), strNewFmt( STORAGE_REPO_BACKUP "/%s/%s%s", strPtr(repoFileReference), strPtr(repoFile), - repoFileCompressed ? "." GZIP_EXT : "")), + repoFileCompressed ? "." GZIP_EXT : ""), + .compressible = compressible), pgFileWrite); // Validate checksum diff --git a/src/common/compress/gzip/compress.c b/src/common/compress/gzip/compress.c index 3e21551e9..a13547ef1 100644 --- a/src/common/compress/gzip/compress.c +++ b/src/common/compress/gzip/compress.c @@ -17,8 +17,7 @@ Gzip Compress /*********************************************************************************************************************************** Filter type constant ***********************************************************************************************************************************/ -#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress" - STRING_STATIC(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE); +STRING_EXTERN(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE); /*********************************************************************************************************************************** Object type @@ -185,12 +184,23 @@ gzipCompressNew(int level, bool raw) // Set free callback to ensure gzip context is freed memContextCallbackSet(driver->memContext, gzipCompressFreeResource, driver); + // Create param list + VariantList *paramList = varLstNew(); + varLstAdd(paramList, varNewInt(level)); + varLstAdd(paramList, varNewBool(raw)); + // Create filter interface this = ioFilterNewP( - GZIP_COMPRESS_FILTER_TYPE_STR, driver, .done = gzipCompressDone, .inOut = gzipCompressProcess, + GZIP_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipCompressDone, .inOut = gzipCompressProcess, .inputSame = gzipCompressInputSame); } MEM_CONTEXT_NEW_END(); FUNCTION_LOG_RETURN(IO_FILTER, this); } + +IoFilter * +gzipCompressNewVar(const VariantList *paramList) +{ + return gzipCompressNew(varIntForce(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1))); +} diff --git a/src/common/compress/gzip/compress.h b/src/common/compress/gzip/compress.h index 32f677865..8c25ac92b 100644 --- a/src/common/compress/gzip/compress.h +++ b/src/common/compress/gzip/compress.h @@ -8,9 +8,16 @@ Compress IO using the gzip format. #include "common/io/filter/filter.h" +/*********************************************************************************************************************************** +Filter type constant +***********************************************************************************************************************************/ +#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress" + STRING_DECLARE(GZIP_COMPRESS_FILTER_TYPE_STR); + /*********************************************************************************************************************************** Constructor ***********************************************************************************************************************************/ IoFilter *gzipCompressNew(int level, bool raw); +IoFilter *gzipCompressNewVar(const VariantList *paramList); #endif diff --git a/src/common/compress/gzip/decompress.c b/src/common/compress/gzip/decompress.c index 3d5305c16..737d0f3ea 100644 --- a/src/common/compress/gzip/decompress.c +++ b/src/common/compress/gzip/decompress.c @@ -17,8 +17,7 @@ Gzip Decompress /*********************************************************************************************************************************** Filter type constant ***********************************************************************************************************************************/ -#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress" - STRING_STATIC(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE); +STRING_EXTERN(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE); /*********************************************************************************************************************************** Object type @@ -162,12 +161,22 @@ gzipDecompressNew(bool raw) // Set free callback to ensure gzip context is freed memContextCallbackSet(driver->memContext, gzipDecompressFreeResource, driver); + // Create param list + VariantList *paramList = varLstNew(); + varLstAdd(paramList, varNewBool(raw)); + // Create filter interface this = ioFilterNewP( - GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, .done = gzipDecompressDone, .inOut = gzipDecompressProcess, + GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipDecompressDone, .inOut = gzipDecompressProcess, .inputSame = gzipDecompressInputSame); } MEM_CONTEXT_NEW_END(); FUNCTION_LOG_RETURN(IO_FILTER, this); } + +IoFilter * +gzipDecompressNewVar(const VariantList *paramList) +{ + return gzipDecompressNew(varBool(varLstGet(paramList, 0))); +} diff --git a/src/common/compress/gzip/decompress.h b/src/common/compress/gzip/decompress.h index 34365ac68..44c84ade6 100644 --- a/src/common/compress/gzip/decompress.h +++ b/src/common/compress/gzip/decompress.h @@ -8,9 +8,16 @@ Decompress IO from the gzip format. #include "common/io/filter/filter.h" +/*********************************************************************************************************************************** +Filter type constant +***********************************************************************************************************************************/ +#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress" + STRING_DECLARE(GZIP_DECOMPRESS_FILTER_TYPE_STR); + /*********************************************************************************************************************************** Constructor ***********************************************************************************************************************************/ IoFilter *gzipDecompressNew(bool raw); +IoFilter *gzipDecompressNewVar(const VariantList *paramList); #endif diff --git a/src/common/crypto/cipherBlock.c b/src/common/crypto/cipherBlock.c index e93bf3d71..f00a9429d 100644 --- a/src/common/crypto/cipherBlock.c +++ b/src/common/crypto/cipherBlock.c @@ -437,7 +437,7 @@ cipherBlockNew(CipherMode mode, CipherType cipherType, const Buffer *pass, const // Create filter interface this = ioFilterNewP( - CIPHER_BLOCK_FILTER_TYPE_STR, driver, .done = cipherBlockDone, .inOut = cipherBlockProcess, + CIPHER_BLOCK_FILTER_TYPE_STR, driver, NULL, .done = cipherBlockDone, .inOut = cipherBlockProcess, .inputSame = cipherBlockInputSame); } MEM_CONTEXT_NEW_END(); diff --git a/src/common/crypto/hash.c b/src/common/crypto/hash.c index 7e22673f6..fb282be09 100644 --- a/src/common/crypto/hash.c +++ b/src/common/crypto/hash.c @@ -164,7 +164,7 @@ cryptoHashNew(const String *type) cryptoError(!EVP_DigestInit_ex(driver->hashContext, driver->hashType, NULL), "unable to initialize hash context"); // Create filter interface - this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, .in = cryptoHashProcess, .result = cryptoHashResult); + this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, NULL, .in = cryptoHashProcess, .result = cryptoHashResult); } MEM_CONTEXT_NEW_END(); diff --git a/src/common/io/filter/buffer.c b/src/common/io/filter/buffer.c index f445aaea1..057fb64e9 100644 --- a/src/common/io/filter/buffer.c +++ b/src/common/io/filter/buffer.c @@ -121,7 +121,7 @@ ioBufferNew(void) IoBuffer *driver = memNew(sizeof(IoBuffer)); driver->memContext = memContextCurrent(); - this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame); + this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, NULL, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame); } MEM_CONTEXT_NEW_END(); diff --git a/src/common/io/filter/filter.c b/src/common/io/filter/filter.c index bb01bec91..1234e02c9 100644 --- a/src/common/io/filter/filter.c +++ b/src/common/io/filter/filter.c @@ -17,6 +17,7 @@ struct IoFilter MemContext *memContext; // Mem context of filter const String *type; // Filter type void *driver; // Filter driver + const VariantList *paramList; // Filter parameters IoFilterInterface interface; // Filter interface bool flushing; // Has the filter started flushing? @@ -30,11 +31,12 @@ New object Allocations will be in the memory context of the caller. ***********************************************************************************************************************************/ IoFilter * -ioFilterNew(const String *type, void *driver, IoFilterInterface interface) +ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface interface) { FUNCTION_LOG_BEGIN(logLevelTrace); FUNCTION_LOG_PARAM(STRING, type); FUNCTION_LOG_PARAM_P(VOID, driver); + FUNCTION_LOG_PARAM(VARIANT_LIST, paramList); FUNCTION_LOG_PARAM(IO_FILTER_INTERFACE, interface); FUNCTION_LOG_END(); @@ -53,6 +55,7 @@ ioFilterNew(const String *type, void *driver, IoFilterInterface interface) this->memContext = memContextCurrent(); this->type = type; this->driver = driver; + this->paramList = paramList; this->interface = interface; FUNCTION_LOG_RETURN(IO_FILTER, this); @@ -202,7 +205,7 @@ ioFilterInterface(const IoFilter *this) /*********************************************************************************************************************************** Does filter produce output? -All InOut filters produce output. +All In filters produce output. ***********************************************************************************************************************************/ bool ioFilterOutput(const IoFilter *this) @@ -216,6 +219,21 @@ ioFilterOutput(const IoFilter *this) FUNCTION_TEST_RETURN(this->interface.inOut != NULL); } +/*********************************************************************************************************************************** +List of filter parameters +***********************************************************************************************************************************/ +const VariantList * +ioFilterParamList(const IoFilter *this) +{ + FUNCTION_TEST_BEGIN(); + FUNCTION_TEST_PARAM(IO_FILTER, this); + FUNCTION_TEST_END(); + + ASSERT(this != NULL); + + FUNCTION_TEST_RETURN(this->paramList); +} + /*********************************************************************************************************************************** Get filter result ***********************************************************************************************************************************/ diff --git a/src/common/io/filter/filter.intern.h b/src/common/io/filter/filter.intern.h index 05b202211..657811279 100644 --- a/src/common/io/filter/filter.intern.h +++ b/src/common/io/filter/filter.intern.h @@ -19,6 +19,7 @@ Each filter has a type that allows it to be identified in the filter list. #define COMMON_IO_FILTER_FILTER_INTERN_H #include "common/io/filter/filter.h" +#include "common/type/variantList.h" /*********************************************************************************************************************************** Constructor @@ -48,10 +49,10 @@ typedef struct IoFilterInterface Variant *(*result)(void *driver); } IoFilterInterface; -#define ioFilterNewP(type, driver, ...) \ - ioFilterNew(type, driver, (IoFilterInterface){__VA_ARGS__}) +#define ioFilterNewP(type, driver, paramList, ...) \ + ioFilterNew(type, driver, paramList, (IoFilterInterface){__VA_ARGS__}) -IoFilter *ioFilterNew(const String *type, void *driver, IoFilterInterface); +IoFilter *ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface); /*********************************************************************************************************************************** Functions @@ -68,6 +69,7 @@ void *ioFilterDriver(IoFilter *this); bool ioFilterInputSame(const IoFilter *this); const IoFilterInterface *ioFilterInterface(const IoFilter *this); bool ioFilterOutput(const IoFilter *this); +const VariantList *ioFilterParamList(const IoFilter *this); /*********************************************************************************************************************************** Macros for function logging diff --git a/src/common/io/filter/group.c b/src/common/io/filter/group.c index 36f48db82..8023f21b9 100644 --- a/src/common/io/filter/group.c +++ b/src/common/io/filter/group.c @@ -106,7 +106,7 @@ ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter) Get a filter ***********************************************************************************************************************************/ static IoFilterData * -ioFilterGroupGet(IoFilterGroup *this, unsigned int filterIdx) +ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx) { FUNCTION_TEST_BEGIN(); FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this); @@ -136,13 +136,16 @@ ioFilterGroupOpen(IoFilterGroup *this) { // If the last filter is not an output filter then add a filter to buffer/copy data. Input filters won't copy to an output // buffer so we need some way to get the data to the output buffer. - if (lstSize(this->filterList) == 0 || !ioFilterOutput((ioFilterGroupGet(this, lstSize(this->filterList) - 1))->filter)) + if (ioFilterGroupSize(this) == 0 || + !ioFilterOutput((ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->filter)) + { ioFilterGroupAdd(this, ioBufferNew()); + } // Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output. Buffer **lastOutputBuffer = NULL; - for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++) + for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); @@ -163,7 +166,7 @@ ioFilterGroupOpen(IoFilterGroup *this) // If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter // will be provided to the process function. - if (ioFilterOutput(filterData->filter) && filterIdx < lstSize(this->filterList) - 1) + if (ioFilterOutput(filterData->filter) && filterIdx < ioFilterGroupSize(this) - 1) { filterData->output = bufNew(ioBufferSize()); lastOutputBuffer = &filterData->output; @@ -207,7 +210,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) // Assign input and output buffers this->input = input; - (ioFilterGroupGet(this, lstSize(this->filterList) - 1))->output = output; + (ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->output = output; // do @@ -220,7 +223,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) if (this->inputSame) { this->inputSame = false; - filterIdx = lstSize(this->filterList); + filterIdx = ioFilterGroupSize(this); do { @@ -242,7 +245,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) // Process forward from the filter that has input to process. This may be a filter that needs the same input or it may be // new input for the first filter. - for (; filterIdx < lstSize(this->filterList); filterIdx++) + for (; filterIdx < ioFilterGroupSize(this); filterIdx++) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); @@ -288,7 +291,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output) this->done = true; this->inputSame = false; - for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++) + for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); @@ -321,7 +324,7 @@ ioFilterGroupClose(IoFilterGroup *this) ASSERT(this != NULL); ASSERT(this->opened && !this->closed); - for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++) + for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++) { IoFilterData *filterData = ioFilterGroupGet(this, filterIdx); const Variant *filterResult = ioFilterResult(filterData->filter); @@ -350,25 +353,6 @@ ioFilterGroupClose(IoFilterGroup *this) FUNCTION_LOG_RETURN_VOID(); } -/*********************************************************************************************************************************** -Move the object to a new context -***********************************************************************************************************************************/ -IoFilterGroup * -ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew) -{ - FUNCTION_TEST_BEGIN(); - FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this); - FUNCTION_TEST_PARAM(MEM_CONTEXT, parentNew); - FUNCTION_TEST_END(); - - ASSERT(parentNew != NULL); - - if (this != NULL) - memContextMove(this->memContext, parentNew); - - FUNCTION_TEST_RETURN(this); -} - /*********************************************************************************************************************************** Is the filter group done processing? ***********************************************************************************************************************************/ @@ -403,6 +387,37 @@ ioFilterGroupInputSame(const IoFilterGroup *this) FUNCTION_TEST_RETURN(this->inputSame); } +/*********************************************************************************************************************************** +Get all filters and parameters so they can be passed to a remote +***********************************************************************************************************************************/ +Variant * +ioFilterGroupParamAll(const IoFilterGroup *this) +{ + FUNCTION_LOG_BEGIN(logLevelDebug); + FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this); + FUNCTION_LOG_END(); + + ASSERT(this != NULL); + ASSERT(!this->opened); + ASSERT(this->filterList != NULL); + + KeyValue *result = kvNew(); + + MEM_CONTEXT_TEMP_BEGIN() + { + for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++) + { + IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter; + const VariantList *paramList = ioFilterParamList(filter); + + kvAdd(result, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL); + } + } + MEM_CONTEXT_TEMP_END(); + + FUNCTION_LOG_RETURN(VARIANT, varNewKv(result)); +} + /*********************************************************************************************************************************** Get filter results ***********************************************************************************************************************************/ @@ -429,11 +444,50 @@ ioFilterGroupResult(const IoFilterGroup *this, const String *filterType) FUNCTION_LOG_RETURN_CONST(VARIANT, result); } +/*********************************************************************************************************************************** +Get all filter results +***********************************************************************************************************************************/ +const Variant * +ioFilterGroupResultAll(const IoFilterGroup *this) +{ + FUNCTION_LOG_BEGIN(logLevelDebug); + FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this); + FUNCTION_LOG_END(); + + ASSERT(this != NULL); + ASSERT(this->closed); + + FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult)); +} + +/*********************************************************************************************************************************** +Return total number of filters +***********************************************************************************************************************************/ +unsigned int +ioFilterGroupSize(const IoFilterGroup *this) +{ + FUNCTION_TEST_BEGIN(); + FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this); + FUNCTION_TEST_END(); + + FUNCTION_TEST_RETURN(lstSize(this->filterList)); +} + /*********************************************************************************************************************************** Render as string for logging ***********************************************************************************************************************************/ String * ioFilterGroupToLog(const IoFilterGroup *this) { - return strNewFmt("{inputSame: %s, done: %s}", cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done)); + return strNewFmt( + "{inputSame: %s, done: %s" +#ifdef DEBUG + ", opened %s, flushing %s, closed %s" +#endif + "}", + cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done) +#ifdef DEBUG + , cvtBoolToConstZ(this->opened), cvtBoolToConstZ(this->flushing), cvtBoolToConstZ(this->closed) +#endif + ); } diff --git a/src/common/io/filter/group.h b/src/common/io/filter/group.h index c44d41f43..8c41729c4 100644 --- a/src/common/io/filter/group.h +++ b/src/common/io/filter/group.h @@ -34,14 +34,15 @@ void ioFilterGroupOpen(IoFilterGroup *this); void ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output); void ioFilterGroupClose(IoFilterGroup *this); -IoFilterGroup *ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew); - /*********************************************************************************************************************************** Getters ***********************************************************************************************************************************/ bool ioFilterGroupDone(const IoFilterGroup *this); bool ioFilterGroupInputSame(const IoFilterGroup *this); +Variant *ioFilterGroupParamAll(const IoFilterGroup *this); const Variant *ioFilterGroupResult(const IoFilterGroup *this, const String *filterType); +const Variant *ioFilterGroupResultAll(const IoFilterGroup *this); +unsigned int ioFilterGroupSize(const IoFilterGroup *this); /*********************************************************************************************************************************** Destructor diff --git a/src/common/io/filter/size.c b/src/common/io/filter/size.c index adac10cf1..b06d2b9e6 100644 --- a/src/common/io/filter/size.c +++ b/src/common/io/filter/size.c @@ -94,7 +94,7 @@ ioSizeNew(void) IoSize *driver = memNew(sizeof(IoSize)); driver->memContext = memContextCurrent(); - this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, .in = ioSizeProcess, .result = ioSizeResult); + this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, NULL, .in = ioSizeProcess, .result = ioSizeResult); } MEM_CONTEXT_NEW_END(); diff --git a/src/common/io/read.c b/src/common/io/read.c index 91ee8b580..7473f5ae9 100644 --- a/src/common/io/read.c +++ b/src/common/io/read.c @@ -56,6 +56,7 @@ ioReadNew(void *driver, IoReadInterface interface) this->memContext = memContextCurrent(); this->driver = driver; this->interface = interface; + this->filterGroup = ioFilterGroupNew(); this->input = bufNew(ioBufferSize()); } MEM_CONTEXT_NEW_END(); @@ -75,25 +76,14 @@ ioReadOpen(IoRead *this) ASSERT(this != NULL); ASSERT(!this->opened && !this->closed); + ASSERT(ioFilterGroupSize(this->filterGroup) == 0 || !ioReadBlock(this)); // Open if the driver has an open function bool result = this->interface.open != NULL ? this->interface.open(this->driver) : true; // Only open the filter group if the read was opened if (result) - { - // If no filter group exists create one to do buffering - if (this->filterGroup == NULL) - { - MEM_CONTEXT_BEGIN(this->memContext) - { - this->filterGroup = ioFilterGroupNew(); - } - MEM_CONTEXT_END(); - } - ioFilterGroupOpen(this->filterGroup); - } #ifdef DEBUG this->opened = result; @@ -365,9 +355,7 @@ ioReadEof(const IoRead *this) } /*********************************************************************************************************************************** -Get/set filters - -Filters must be set before open and cannot be reset. +Get filter group if filters need to be added ***********************************************************************************************************************************/ IoFilterGroup * ioReadFilterGroup(const IoRead *this) @@ -381,25 +369,6 @@ ioReadFilterGroup(const IoRead *this) FUNCTION_TEST_RETURN(this->filterGroup); } -IoRead * -ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup) -{ - FUNCTION_LOG_BEGIN(logLevelTrace); - FUNCTION_LOG_PARAM(IO_READ, this); - FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup); - FUNCTION_LOG_END(); - - ASSERT(this != NULL); - ASSERT(filterGroup != NULL); - ASSERT(this->filterGroup == NULL); - ASSERT(!this->opened && !this->closed); - ASSERT(!ioReadBlock(this)); - - this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext); - - FUNCTION_LOG_RETURN(IO_READ, this); -} - /*********************************************************************************************************************************** Handle (file descriptor) for the read object diff --git a/src/common/io/read.h b/src/common/io/read.h index 81db3e9c1..29f324da0 100644 --- a/src/common/io/read.h +++ b/src/common/io/read.h @@ -34,7 +34,6 @@ Getters/Setters bool ioReadBlock(const IoRead *this); bool ioReadEof(const IoRead *this); IoFilterGroup *ioReadFilterGroup(const IoRead *this); -IoRead *ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup); int ioReadHandle(const IoRead *this); /*********************************************************************************************************************************** diff --git a/src/common/io/write.c b/src/common/io/write.c index 6a49390b8..c6a5daed2 100644 --- a/src/common/io/write.c +++ b/src/common/io/write.c @@ -24,7 +24,7 @@ struct IoWrite Buffer *output; // Output buffer #ifdef DEBUG - bool filterGroupSet; // Was an IoFilterGroup set? + bool filterGroupSet; // Were filters set? bool opened; // Has the io been opened? bool closed; // Has the io been closed? #endif @@ -54,6 +54,7 @@ ioWriteNew(void *driver, IoWriteInterface interface) this->memContext = memContextCurrent(); this->driver = driver; this->interface = interface; + this->filterGroup = ioFilterGroupNew(); this->output = bufNew(ioBufferSize()); } MEM_CONTEXT_NEW_END(); @@ -77,16 +78,12 @@ ioWriteOpen(IoWrite *this) if (this->interface.open != NULL) this->interface.open(this->driver); - // If no filter group exists create one to do buffering - if (this->filterGroup == NULL) - { - MEM_CONTEXT_BEGIN(this->memContext) - { - this->filterGroup = ioFilterGroupNew(); - } - MEM_CONTEXT_END(); - } + // Track whether filters were added to prevent flush() from being called later since flush() won't work with most filters +#ifdef DEBUG + this->filterGroupSet = ioFilterGroupSize(this->filterGroup) > 0; +#endif + // Open the filter group ioFilterGroupOpen(this->filterGroup); #ifdef DEBUG @@ -287,29 +284,6 @@ ioWriteFilterGroup(const IoWrite *this) FUNCTION_TEST_RETURN(this->filterGroup); } -IoWrite * -ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup) -{ - FUNCTION_LOG_BEGIN(logLevelTrace); - FUNCTION_LOG_PARAM(IO_WRITE, this); - FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup); - FUNCTION_LOG_END(); - - ASSERT(this != NULL); - ASSERT(filterGroup != NULL); - ASSERT(this->filterGroup == NULL); - ASSERT(!this->opened && !this->closed); - - // Track whether a filter group was set to prevent flush() from being called later -#ifdef DEBUG - this->filterGroupSet = true; -#endif - - this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext); - - FUNCTION_LOG_RETURN(IO_WRITE, this); -} - /*********************************************************************************************************************************** Handle (file descriptor) for the write object diff --git a/src/common/io/write.h b/src/common/io/write.h index 4ab8d2a6c..5c9553299 100644 --- a/src/common/io/write.h +++ b/src/common/io/write.h @@ -34,7 +34,6 @@ void ioWriteClose(IoWrite *this); Getters/Setters ***********************************************************************************************************************************/ IoFilterGroup *ioWriteFilterGroup(const IoWrite *this); -IoWrite *ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup); int ioWriteHandle(const IoWrite *this); /*********************************************************************************************************************************** diff --git a/src/info/info.c b/src/info/info.c index 8fbceceaa..faaf2eb34 100644 --- a/src/info/info.c +++ b/src/info/info.c @@ -142,13 +142,13 @@ infoLoad(Info *this, const Storage *storage, const String *fileName, bool copyFi const String *fileNameExt = copyFile ? strNewFmt("%s" INFO_COPY_EXT, strPtr(fileName)) : fileName; // Attempt to load the file - StorageRead *infoRead = storageNewReadNP(storage, fileNameExt); + StorageRead *infoRead = storageNewReadP(storage, fileNameExt, .compressible = cipherType == cipherTypeNone); if (cipherType != cipherTypeNone) { - ioReadFilterGroupSet( - storageReadIo(infoRead), - ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL))); + ioFilterGroupAdd( + ioReadFilterGroup(storageReadIo(infoRead)), + cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL)); } // Load and parse the info file @@ -334,13 +334,12 @@ infoSave( iniSet(ini, INFO_SECTION_BACKREST_STR, INFO_KEY_CHECKSUM_STR, jsonFromStr(infoHash(ini))); // Save info file - IoWrite *infoWrite = storageWriteIo(storageNewWriteNP(storage, fileName)); + IoWrite *infoWrite = storageWriteIo(storageNewWriteP(storage, fileName, .compressible = cipherType == cipherTypeNone)); if (cipherType != cipherTypeNone) { - ioWriteFilterGroupSet( - infoWrite, - ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL))); + ioFilterGroupAdd( + ioWriteFilterGroup(infoWrite), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL)); } iniSave(ini, infoWrite); diff --git a/src/storage/helper.c b/src/storage/helper.c index 4c572700a..4b5282f3d 100644 --- a/src/storage/helper.c +++ b/src/storage/helper.c @@ -278,7 +278,7 @@ storageRepoGet(const String *type, bool write) { result = storageRemoteNew( STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, storageRepoPathExpression, - protocolRemoteGet(protocolStorageTypeRepo)); + protocolRemoteGet(protocolStorageTypeRepo), cfgOptionUInt(cfgOptCompressLevelNetwork)); } // Use CIFS storage else if (strEqZ(type, STORAGE_TYPE_CIFS)) diff --git a/src/storage/posix/storage.c b/src/storage/posix/storage.c index e1e6c7868..9c31a5955 100644 --- a/src/storage/posix/storage.c +++ b/src/storage/posix/storage.c @@ -394,7 +394,7 @@ storagePosixMove(THIS_VOID, StorageRead *source, StorageWrite *destination) New file read object ***********************************************************************************************************************************/ static StorageRead * -storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing) +storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible) { THIS(StoragePosix); @@ -402,6 +402,7 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing) FUNCTION_LOG_PARAM(STORAGE_POSIX, this); FUNCTION_LOG_PARAM(STRING, file); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); + (void)compressible; FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -416,7 +417,7 @@ New file write object static StorageWrite * storagePosixNewWrite( THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified, - bool createPath, bool syncFile, bool syncPath, bool atomic) + bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible) { THIS(StoragePosix); @@ -432,6 +433,7 @@ storagePosixNewWrite( FUNCTION_LOG_PARAM(BOOL, syncFile); FUNCTION_LOG_PARAM(BOOL, syncPath); FUNCTION_LOG_PARAM(BOOL, atomic); + (void)compressible; FUNCTION_LOG_END(); ASSERT(this != NULL); diff --git a/src/storage/read.intern.h b/src/storage/read.intern.h index df8abb597..04edf8c6f 100644 --- a/src/storage/read.intern.h +++ b/src/storage/read.intern.h @@ -14,6 +14,8 @@ typedef struct StorageReadInterface { const String * type; const String * name; + bool compressible; // Is this file compressible? + unsigned int compressLevel; // Level to use for compression bool ignoreMissing; IoReadInterface ioInterface; } StorageReadInterface; diff --git a/src/storage/remote/protocol.c b/src/storage/remote/protocol.c index 20b1fb871..9c5cdaa98 100644 --- a/src/storage/remote/protocol.c +++ b/src/storage/remote/protocol.c @@ -3,6 +3,8 @@ Remote Storage Protocol Handler ***********************************************************************************************************************************/ #include "build.auto.h" +#include "common/compress/gzip/compress.h" +#include "common/compress/gzip/decompress.h" #include "common/debug.h" #include "common/io/io.h" #include "common/log.h" @@ -40,6 +42,38 @@ static struct RegExp *blockRegExp; // Regular expression to check block messages } storageRemoteProtocolLocal; +/*********************************************************************************************************************************** +Set filter group based on passed filters +***********************************************************************************************************************************/ +static void +storageRemoteFilterGroup(IoFilterGroup *filterGroup, const Variant *filterList) +{ + FUNCTION_TEST_BEGIN(); + FUNCTION_TEST_PARAM(IO_FILTER_GROUP, filterGroup); + FUNCTION_TEST_PARAM(VARIANT, filterList); + FUNCTION_TEST_END(); + + ASSERT(filterGroup != NULL); + ASSERT(filterList != NULL); + + const VariantList *filterKeyList = kvKeyList(varKv(filterList)); + + for (unsigned int filterIdx = 0; filterIdx < varLstSize(filterKeyList); filterIdx++) + { + const String *filterKey = varStr(varLstGet(filterKeyList, filterIdx)); + const VariantList *filterParam = varVarLst(kvGet(varKv(filterList), varLstGet(filterKeyList, filterIdx))); + + if (strEq(filterKey, GZIP_COMPRESS_FILTER_TYPE_STR)) + ioFilterGroupAdd(filterGroup, gzipCompressNewVar(filterParam)); + else if (strEq(filterKey, GZIP_DECOMPRESS_FILTER_TYPE_STR)) + ioFilterGroupAdd(filterGroup, gzipDecompressNewVar(filterParam)); + else + THROW_FMT(AssertError, "unable to add filter '%s'", strPtr(filterKey)); + } + + FUNCTION_TEST_RETURN_VOID(); +} + /*********************************************************************************************************************************** Process storage protocol requests ***********************************************************************************************************************************/ @@ -87,7 +121,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto // Create the read object IoRead *fileRead = storageReadIo( interface.newRead( - driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1)))); + driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1)), false)); + + // Set filter group based on passed filters + storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 2)); // Check if the file exists bool exists = ioReadOpen(fileRead); @@ -127,7 +164,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varUIntForce(varLstGet(paramList, 1)), varUIntForce(varLstGet(paramList, 2)), varStr(varLstGet(paramList, 3)), varStr(varLstGet(paramList, 4)), (time_t)varIntForce(varLstGet(paramList, 5)), varBool(varLstGet(paramList, 6)), - varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9)))); + varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9)), false)); + + // Set filter group based on passed filters + storageRemoteFilterGroup(ioWriteFilterGroup(fileWrite), varLstGet(paramList, 10)); // Open file ioWriteOpen(fileWrite); diff --git a/src/storage/remote/read.c b/src/storage/remote/read.c index 2534863a7..89a602133 100644 --- a/src/storage/remote/read.c +++ b/src/storage/remote/read.c @@ -6,6 +6,8 @@ Remote Storage Read #include #include +#include "common/compress/gzip/compress.h" +#include "common/compress/gzip/decompress.h" #include "common/debug.h" #include "common/io/read.intern.h" #include "common/log.h" @@ -24,10 +26,15 @@ typedef struct StorageReadRemote MemContext *memContext; // Object mem context StorageReadInterface interface; // Interface StorageRemote *storage; // Storage that created this object + StorageRead *read; // Storage read interface ProtocolClient *client; // Protocol client for requests size_t remaining; // Bytes remaining to be read in block bool eof; // Has the file reached eof? + +#ifdef DEBUG + uint64_t protocolReadBytes; // How many bytes were read from the protocol layer? +#endif } StorageReadRemote; /*********************************************************************************************************************************** @@ -56,9 +63,25 @@ storageReadRemoteOpen(THIS_VOID) MEM_CONTEXT_TEMP_BEGIN() { + IoFilterGroup *filterGroup = ioFilterGroupNew(); + + // If the file is compressible add compression filter on the remote + if (this->interface.compressible) + ioFilterGroupAdd(filterGroup, gzipCompressNew((int)this->interface.compressLevel, true)); + ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR); protocolCommandParamAdd(command, VARSTR(this->interface.name)); protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing)); + protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup)); + + // If the file is compressible add decompression filter locally + if (this->interface.compressible) + { + // Since we can't insert filters yet we'll just error if there are already filters in the list + CHECK(ioFilterGroupSize(ioReadFilterGroup(storageReadIo(this->read))) == 0); + + ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), gzipDecompressNew(true)); + } result = varBool(protocolClientExecute(this->client, command, true)); } @@ -100,6 +123,10 @@ storageReadRemote(THIS_VOID, Buffer *buffer, bool block) if (this->remaining == 0) this->eof = true; + +#ifdef DEBUG + this->protocolReadBytes += this->remaining; +#endif } MEM_CONTEXT_TEMP_END(); } @@ -147,30 +174,36 @@ storageReadRemoteEof(THIS_VOID) New object ***********************************************************************************************************************************/ StorageRead * -storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing) +storageReadRemoteNew( + StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible, + unsigned int compressLevel) { FUNCTION_LOG_BEGIN(logLevelTrace); FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage); FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client); FUNCTION_LOG_PARAM(STRING, name); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); + FUNCTION_LOG_PARAM(BOOL, compressible); + FUNCTION_LOG_PARAM(UINT, compressLevel); FUNCTION_LOG_END(); ASSERT(storage != NULL); ASSERT(client != NULL); ASSERT(name != NULL); - StorageRead *this = NULL; + StorageReadRemote *this = NULL; MEM_CONTEXT_NEW_BEGIN("StorageReadRemote") { - StorageReadRemote *driver = memNew(sizeof(StorageReadRemote)); - driver->memContext = MEM_CONTEXT_NEW(); + this = memNew(sizeof(StorageReadRemote)); + this->memContext = MEM_CONTEXT_NEW(); - driver->interface = (StorageReadInterface) + this->interface = (StorageReadInterface) { .type = STORAGE_REMOTE_TYPE_STR, .name = strDup(name), + .compressible = compressible, + .compressLevel = compressLevel, .ignoreMissing = ignoreMissing, .ioInterface = (IoReadInterface) @@ -181,12 +214,13 @@ storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const Strin }, }; - driver->storage = storage; - driver->client = client; + this->storage = storage; + this->client = client; - this = storageReadNew(driver, &driver->interface); + this->read = storageReadNew(this, &this->interface); } MEM_CONTEXT_NEW_END(); - FUNCTION_LOG_RETURN(STORAGE_READ, this); + ASSERT(this != NULL); + FUNCTION_LOG_RETURN(STORAGE_READ, this->read); } diff --git a/src/storage/remote/read.h b/src/storage/remote/read.h index 961d660c6..277f1f61e 100644 --- a/src/storage/remote/read.h +++ b/src/storage/remote/read.h @@ -11,6 +11,8 @@ Remote Storage Read /*********************************************************************************************************************************** Constructor ***********************************************************************************************************************************/ -StorageRead *storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing); +StorageRead *storageReadRemoteNew( + StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible, + unsigned int compressLevel); #endif diff --git a/src/storage/remote/storage.c b/src/storage/remote/storage.c index 97ad41ef5..d37b43178 100644 --- a/src/storage/remote/storage.c +++ b/src/storage/remote/storage.c @@ -24,6 +24,7 @@ struct StorageRemote { MemContext *memContext; ProtocolClient *client; // Protocol client + unsigned int compressLevel; // Protocol compression level }; /*********************************************************************************************************************************** @@ -113,7 +114,7 @@ storageRemoteList(THIS_VOID, const String *path, const String *expression) New file read object ***********************************************************************************************************************************/ static StorageRead * -storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing) +storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible) { THIS(StorageRemote); @@ -121,11 +122,15 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing) FUNCTION_LOG_PARAM(STORAGE_REMOTE, this); FUNCTION_LOG_PARAM(STRING, file); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); + FUNCTION_LOG_PARAM(BOOL, compressible); FUNCTION_LOG_END(); ASSERT(this != NULL); - FUNCTION_LOG_RETURN(STORAGE_READ, storageReadRemoteNew(this, this->client, file, ignoreMissing)); + FUNCTION_LOG_RETURN( + STORAGE_READ, + storageReadRemoteNew( + this, this->client, file, ignoreMissing, this->compressLevel > 0 ? compressible : false, this->compressLevel)); } /*********************************************************************************************************************************** @@ -134,7 +139,7 @@ New file write object static StorageWrite * storageRemoteNewWrite( THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified, - bool createPath, bool syncFile, bool syncPath, bool atomic) + bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible) { THIS(StorageRemote); @@ -150,6 +155,7 @@ storageRemoteNewWrite( FUNCTION_LOG_PARAM(BOOL, syncFile); FUNCTION_LOG_PARAM(BOOL, syncPath); FUNCTION_LOG_PARAM(BOOL, atomic); + FUNCTION_LOG_PARAM(BOOL, compressible); FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -158,7 +164,8 @@ storageRemoteNewWrite( FUNCTION_LOG_RETURN( STORAGE_WRITE, storageWriteRemoteNew( - this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic)); + this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic, + this->compressLevel > 0 ? compressible : false, this->compressLevel)); } /*********************************************************************************************************************************** @@ -319,7 +326,8 @@ New object ***********************************************************************************************************************************/ Storage * storageRemoteNew( - mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client) + mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client, + unsigned int compressLevel) { FUNCTION_LOG_BEGIN(logLevelDebug); FUNCTION_LOG_PARAM(MODE, modeFile); @@ -327,6 +335,7 @@ storageRemoteNew( FUNCTION_LOG_PARAM(BOOL, write); FUNCTION_LOG_PARAM(FUNCTIONP, pathExpressionFunction); FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client); + FUNCTION_LOG_PARAM(UINT, compressLevel); FUNCTION_LOG_END(); ASSERT(modeFile != 0); @@ -340,6 +349,7 @@ storageRemoteNew( StorageRemote *driver = memNew(sizeof(StorageRemote)); driver->memContext = MEM_CONTEXT_NEW(); driver->client = client; + driver->compressLevel = compressLevel; uint64_t feature = 0; diff --git a/src/storage/remote/storage.h b/src/storage/remote/storage.h index 591002ada..67a8238f3 100644 --- a/src/storage/remote/storage.h +++ b/src/storage/remote/storage.h @@ -17,6 +17,7 @@ Storage type Constructor ***********************************************************************************************************************************/ Storage *storageRemoteNew( - mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client); + mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client, + unsigned int compressLevel); #endif diff --git a/src/storage/remote/write.c b/src/storage/remote/write.c index 3715d84e8..8f98b9775 100644 --- a/src/storage/remote/write.c +++ b/src/storage/remote/write.c @@ -3,6 +3,8 @@ Remote Storage File write ***********************************************************************************************************************************/ #include "build.auto.h" +#include "common/compress/gzip/compress.h" +#include "common/compress/gzip/decompress.h" #include "common/debug.h" #include "common/io/write.intern.h" #include "common/log.h" @@ -23,7 +25,12 @@ typedef struct StorageWriteRemote MemContext *memContext; // Object mem context StorageWriteInterface interface; // Interface StorageRemote *storage; // Storage that created this object + StorageWrite *write; // Storage write interface ProtocolClient *client; // Protocol client to make requests with + +#ifdef DEBUG + uint64_t protocolWriteBytes; // How many bytes were written to the protocol layer? +#endif } StorageWriteRemote; /*********************************************************************************************************************************** @@ -61,6 +68,12 @@ storageWriteRemoteOpen(THIS_VOID) MEM_CONTEXT_TEMP_BEGIN() { + IoFilterGroup *filterGroup = ioFilterGroupNew(); + + // If the file is compressible add decompression filter on the remote + if (this->interface.compressible) + ioFilterGroupAdd(filterGroup, gzipDecompressNew(true)); + ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR); protocolCommandParamAdd(command, VARSTR(this->interface.name)); protocolCommandParamAdd(command, VARUINT(this->interface.modeFile)); @@ -72,6 +85,14 @@ storageWriteRemoteOpen(THIS_VOID) protocolCommandParamAdd(command, VARBOOL(this->interface.syncFile)); protocolCommandParamAdd(command, VARBOOL(this->interface.syncPath)); protocolCommandParamAdd(command, VARBOOL(this->interface.atomic)); + protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup)); + + // If the file is compressible add compression filter locally + if (this->interface.compressible) + { + ioFilterGroupAdd( + ioWriteFilterGroup(storageWriteIo(this->write)), gzipCompressNew((int)this->interface.compressLevel, true)); + } protocolClientExecute(this->client, command, false); @@ -103,6 +124,10 @@ storageWriteRemote(THIS_VOID, const Buffer *buffer) ioWrite(protocolClientIoWrite(this->client), buffer); ioWriteFlush(protocolClientIoWrite(this->client)); +#ifdef DEBUG + this->protocolWriteBytes += bufUsed(buffer); +#endif + FUNCTION_LOG_RETURN_VOID(); } @@ -140,7 +165,8 @@ Create a new file StorageWrite * storageWriteRemoteNew( StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user, - const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic) + const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible, + unsigned int compressLevel) { FUNCTION_LOG_BEGIN(logLevelTrace); FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage); @@ -154,6 +180,8 @@ storageWriteRemoteNew( FUNCTION_LOG_PARAM(BOOL, syncFile); FUNCTION_LOG_PARAM(BOOL, syncPath); FUNCTION_LOG_PARAM(BOOL, atomic); + FUNCTION_LOG_PARAM(BOOL, compressible); + FUNCTION_LOG_PARAM(UINT, compressLevel); FUNCTION_LOG_END(); ASSERT(storage != NULL); @@ -162,18 +190,20 @@ storageWriteRemoteNew( ASSERT(modeFile != 0); ASSERT(modePath != 0); - StorageWrite *this = NULL; + StorageWriteRemote *this = NULL; MEM_CONTEXT_NEW_BEGIN("StorageWriteRemote") { - StorageWriteRemote *driver = memNew(sizeof(StorageWriteRemote)); - driver->memContext = MEM_CONTEXT_NEW(); + this = memNew(sizeof(StorageWriteRemote)); + this->memContext = MEM_CONTEXT_NEW(); - driver->interface = (StorageWriteInterface) + this->interface = (StorageWriteInterface) { .type = STORAGE_REMOTE_TYPE_STR, .name = strDup(name), .atomic = atomic, + .compressible = compressible, + .compressLevel = compressLevel, .createPath = createPath, .group = strDup(group), .modeFile = modeFile, @@ -191,12 +221,13 @@ storageWriteRemoteNew( }, }; - driver->storage = storage; - driver->client = client; + this->storage = storage; + this->client = client; - this = storageWriteNew(driver, &driver->interface); + this->write = storageWriteNew(this, &this->interface); } MEM_CONTEXT_NEW_END(); - FUNCTION_LOG_RETURN(STORAGE_WRITE, this); + ASSERT(this != NULL); + FUNCTION_LOG_RETURN(STORAGE_WRITE, this->write); } diff --git a/src/storage/remote/write.h b/src/storage/remote/write.h index 725bedff9..1dec81ce1 100644 --- a/src/storage/remote/write.h +++ b/src/storage/remote/write.h @@ -13,6 +13,7 @@ Constructor ***********************************************************************************************************************************/ StorageWrite *storageWriteRemoteNew( StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user, - const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic); + const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible, + unsigned int compressLevel); #endif diff --git a/src/storage/s3/storage.c b/src/storage/s3/storage.c index de63b43e9..9d54b6b1f 100644 --- a/src/storage/s3/storage.c +++ b/src/storage/s3/storage.c @@ -657,7 +657,7 @@ storageS3List(THIS_VOID, const String *path, const String *expression) New file read object ***********************************************************************************************************************************/ static StorageRead * -storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing) +storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible) { THIS(StorageS3); @@ -665,6 +665,7 @@ storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing) FUNCTION_LOG_PARAM(STORAGE_S3, this); FUNCTION_LOG_PARAM(STRING, file); FUNCTION_LOG_PARAM(BOOL, ignoreMissing); + (void)compressible; FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -679,7 +680,7 @@ New file write object static StorageWrite * storageS3NewWrite( THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified, - bool createPath, bool syncFile, bool syncPath, bool atomic) + bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible) { THIS(StorageS3); @@ -695,6 +696,7 @@ storageS3NewWrite( FUNCTION_LOG_PARAM(BOOL, syncFile); FUNCTION_LOG_PARAM(BOOL, syncPath); FUNCTION_LOG_PARAM(BOOL, atomic); + (void)compressible; FUNCTION_LOG_END(); ASSERT(this != NULL); diff --git a/src/storage/storage.c b/src/storage/storage.c index 3f4236117..ec82d96f0 100644 --- a/src/storage/storage.c +++ b/src/storage/storage.c @@ -407,7 +407,7 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p FUNCTION_LOG_PARAM(STORAGE, this); FUNCTION_LOG_PARAM(STRING, fileExp); FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing); - FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup); + FUNCTION_LOG_PARAM(BOOL, param.compressible); FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -416,12 +416,9 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p MEM_CONTEXT_TEMP_BEGIN() { - result = this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing); - - if (param.filterGroup != NULL) - ioReadFilterGroupSet(storageReadIo(result), param.filterGroup); - - storageReadMove(result, MEM_CONTEXT_OLD()); + result = storageReadMove( + this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing, param.compressible), + MEM_CONTEXT_OLD()); } MEM_CONTEXT_TEMP_END(); @@ -446,7 +443,7 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam FUNCTION_LOG_PARAM(BOOL, param.noSyncFile); FUNCTION_LOG_PARAM(BOOL, param.noSyncPath); FUNCTION_LOG_PARAM(BOOL, param.noAtomic); - FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup); + FUNCTION_LOG_PARAM(BOOL, param.compressible); FUNCTION_LOG_END(); ASSERT(this != NULL); @@ -456,15 +453,12 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam MEM_CONTEXT_TEMP_BEGIN() { - result = this->interface.newWrite( - this->driver, storagePathNP(this, fileExp), param.modeFile != 0 ? param.modeFile : this->modeFile, - param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified, !param.noCreatePath, - !param.noSyncFile, !param.noSyncPath, !param.noAtomic); - - if (param.filterGroup != NULL) - ioWriteFilterGroupSet(storageWriteIo(result), param.filterGroup); - - storageWriteMove(result, MEM_CONTEXT_OLD()); + result = storageWriteMove( + this->interface.newWrite( + this->driver, storagePathNP(this, fileExp), param.modeFile != 0 ? param.modeFile : this->modeFile, + param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified, + !param.noCreatePath, !param.noSyncFile, !param.noSyncPath, !param.noAtomic, param.compressible), + MEM_CONTEXT_OLD()); } MEM_CONTEXT_TEMP_END(); diff --git a/src/storage/storage.h b/src/storage/storage.h index 8b3c27b20..4b53e08a5 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -146,7 +146,7 @@ storageNewRead typedef struct StorageNewReadParam { bool ignoreMissing; - IoFilterGroup *filterGroup; + bool compressible; } StorageNewReadParam; #define storageNewReadP(this, pathExp, ...) \ @@ -170,7 +170,7 @@ typedef struct StorageNewWriteParam bool noSyncFile; bool noSyncPath; bool noAtomic; - IoFilterGroup *filterGroup; + bool compressible; } StorageNewWriteParam; #define storageNewWriteP(this, pathExp, ...) \ diff --git a/src/storage/storage.intern.h b/src/storage/storage.intern.h index cadaac644..aab77cf96 100644 --- a/src/storage/storage.intern.h +++ b/src/storage/storage.intern.h @@ -57,15 +57,16 @@ typedef struct StorageInterface // Features implemented by the storage driver uint64_t feature; + bool (*copy)(StorageRead *source, StorageWrite *destination); bool (*exists)(void *driver, const String *file); StorageInfo (*info)(void *driver, const String *path, bool followLink); bool (*infoList)(void *driver, const String *file, StorageInfoListCallback callback, void *callbackData); StringList *(*list)(void *driver, const String *path, const String *expression); bool (*move)(void *driver, StorageRead *source, StorageWrite *destination); - StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing); + StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing, bool compressible); StorageWrite *(*newWrite)( void *driver, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, - time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic); + time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible); void (*pathCreate)(void *driver, const String *path, bool errorOnExists, bool noParentCreate, mode_t mode); bool (*pathExists)(void *driver, const String *path); bool (*pathRemove)(void *driver, const String *path, bool recurse); diff --git a/src/storage/write.intern.h b/src/storage/write.intern.h index 53b8ebe2f..b36820ad4 100644 --- a/src/storage/write.intern.h +++ b/src/storage/write.intern.h @@ -23,6 +23,8 @@ typedef struct StorageWriteInterface bool atomic; bool createPath; + bool compressible; // Is this file compressible? + unsigned int compressLevel; // Level to use for compression const String *group; // Group that owns the file mode_t modeFile; mode_t modePath; diff --git a/test/define.yaml b/test/define.yaml index cde13aa8a..4a9ab6eb2 100644 --- a/test/define.yaml +++ b/test/define.yaml @@ -534,6 +534,8 @@ unit: storage/remote/storage: full storage/remote/write: full storage/helper: full + storage/read: full + storage/write: full storage/storage: full # ---------------------------------------------------------------------------------------------------------------------------- diff --git a/test/src/module/command/archiveGetTest.c b/test/src/module/command/archiveGetTest.c index b8926992c..88bf072a9 100644 --- a/test/src/module/command/archiveGetTest.c +++ b/test/src/module/command/archiveGetTest.c @@ -186,10 +186,9 @@ testRun(void) // ------------------------------------------------------------------------------------------------------------------------- StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test1/archive.info")); - ioWriteFilterGroupSet( - storageWriteIo(infoWrite), - ioFilterGroupAdd( - ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL))); + ioFilterGroupAdd( + ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, + BUFSTRDEF("12345678"), NULL)); storagePutNP( infoWrite, @@ -208,11 +207,10 @@ testRun(void) strNew( "repo/archive/test1/10-1/01ABCDEF01ABCDEF/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.gz")); - IoFilterGroup *filterGroup = ioFilterGroupNew(); + IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(destination)); ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false)); ioFilterGroupAdd( filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("worstpassphraseever"), NULL)); - ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup); storagePutNP(destination, buffer); TEST_RESULT_INT( diff --git a/test/src/module/command/archivePushTest.c b/test/src/module/command/archivePushTest.c index 288281a61..66bd031bd 100644 --- a/test/src/module/command/archivePushTest.c +++ b/test/src/module/command/archivePushTest.c @@ -373,10 +373,9 @@ testRun(void) StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test/archive.info")); - ioWriteFilterGroupSet( - storageWriteIo(infoWrite), - ioFilterGroupAdd( - ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpassphrase"), NULL))); + ioFilterGroupAdd( + ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, + BUFSTRDEF("badpassphrase"), NULL)); storagePutNP( infoWrite, diff --git a/test/src/module/command/backupCommonTest.c b/test/src/module/command/backupCommonTest.c index 64637b50d..904e65c6f 100644 --- a/test/src/module/command/backupCommonTest.c +++ b/test/src/module/command/backupCommonTest.c @@ -139,9 +139,8 @@ testRun(void) bufUsedSet(buffer, bufSize(buffer)); memset(bufPtr(buffer), 0, bufSize(buffer)); - IoWrite *write = ioWriteFilterGroupSet( - ioBufferWriteNew(bufferOut), - ioFilterGroupAdd(ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0))); + IoWrite *write = ioBufferWriteNew(bufferOut); + ioFilterGroupAdd(ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0)); ioWriteOpen(write); ioWrite(write, buffer); ioWriteClose(write); @@ -161,10 +160,9 @@ testRun(void) ((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.walid = 0xF0F0F0F0; ((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.xrecoff = 0xF0F0F0F0; - write = ioWriteFilterGroupSet( - ioBufferWriteNew(bufferOut), - ioFilterGroupAdd( - ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000))); + write = ioBufferWriteNew(bufferOut); + ioFilterGroupAdd( + ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)); ioWriteOpen(write); ioWrite(write, buffer); ioWriteClose(write); @@ -209,10 +207,9 @@ testRun(void) ((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_upper = 0x01; ((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_lsn.xrecoff = 0x7; - write = ioWriteFilterGroupSet( - ioBufferWriteNew(bufferOut), - ioFilterGroupAdd( - ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000))); + write = ioBufferWriteNew(bufferOut); + ioFilterGroupAdd( + ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)); ioWriteOpen(write); ioWrite(write, buffer); ioWriteClose(write); @@ -227,10 +224,9 @@ testRun(void) bufUsedSet(buffer, bufSize(buffer)); memset(bufPtr(buffer), 0, bufSize(buffer)); - write = ioWriteFilterGroupSet( - ioBufferWriteNew(bufferOut), - ioFilterGroupAdd( - ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000))); + write = ioBufferWriteNew(bufferOut); + ioFilterGroupAdd( + ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)); ioWriteOpen(write); ioWrite(write, buffer); ioWriteClose(write); @@ -245,10 +241,9 @@ testRun(void) bufUsedSet(buffer, bufSize(buffer)); memset(bufPtr(buffer), 0, bufSize(buffer)); - write = ioWriteFilterGroupSet( - ioBufferWriteNew(bufferOut), - ioFilterGroupAdd( - ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000))); + write = ioBufferWriteNew(bufferOut); + ioFilterGroupAdd( + ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)); ioWriteOpen(write); ioWrite(write, buffer); TEST_ERROR(ioWrite(write, buffer), AssertError, "should not be possible to see two misaligned pages in a row"); diff --git a/test/src/module/command/restoreTest.c b/test/src/module/command/restoreTest.c index 2424ee0d2..0e280d7c5 100644 --- a/test/src/module/command/restoreTest.c +++ b/test/src/module/command/restoreTest.c @@ -65,10 +65,9 @@ testRun(void) // Create a compressed encrypted repo file StorageWrite *ceRepoFile = storageNewWriteNP( storageRepoWrite(), strNewFmt(STORAGE_REPO_BACKUP "/%s/%s.gz", strPtr(repoFileReferenceFull), strPtr(repoFile1))); - IoFilterGroup *filterGroup = ioFilterGroupNew(); + IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(ceRepoFile)); ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false)); ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpass"), NULL)); - ioWriteFilterGroupSet(storageWriteIo(ceRepoFile), filterGroup); storagePutNP(ceRepoFile, BUFSTRDEF("acefile")); diff --git a/test/src/module/common/compressGzipTest.c b/test/src/module/common/compressGzipTest.c index ad47e5b6d..ddfe4879a 100644 --- a/test/src/module/common/compressGzipTest.c +++ b/test/src/module/common/compressGzipTest.c @@ -16,10 +16,8 @@ testCompress(IoFilter *compress, Buffer *decompressed, size_t inputSize, size_t size_t inputTotal = 0; ioBufferSizeSet(outputSize); - IoFilterGroup *filterGroup = ioFilterGroupNew(); - ioFilterGroupAdd(filterGroup, compress); IoWrite *write = ioBufferWriteNew(compressed); - ioWriteFilterGroupSet(write, filterGroup); + ioFilterGroupAdd(ioWriteFilterGroup(write), compress); ioWriteOpen(write); // Compress input data @@ -52,10 +50,8 @@ testDecompress(IoFilter *decompress, Buffer *compressed, size_t inputSize, size_ Buffer *output = bufNew(outputSize); ioBufferSizeSet(inputSize); - IoFilterGroup *filterGroup = ioFilterGroupNew(); - ioFilterGroupAdd(filterGroup, decompress); IoRead *read = ioBufferReadNew(compressed); - ioReadFilterGroupSet(read, filterGroup); + ioFilterGroupAdd(ioReadFilterGroup(read), decompress); ioReadOpen(read); while (!ioReadEof(read)) @@ -110,8 +106,15 @@ testRun(void) Buffer *compressed = NULL; Buffer *decompressed = bufNewC(simpleData, strlen(simpleData)); + VariantList *compressParamList = varLstNew(); + varLstAdd(compressParamList, varNewUInt(3)); + varLstAdd(compressParamList, varNewBool(false)); + + VariantList *decompressParamList = varLstNew(); + varLstAdd(decompressParamList, varNewBool(false)); + TEST_ASSIGN( - compressed, testCompress(gzipCompressNew(3, false), decompressed, 1024, 1024), + compressed, testCompress(gzipCompressNewVar(compressParamList), decompressed, 1024, 1024), "simple data - compress large in/large out buffer"); TEST_RESULT_BOOL( @@ -127,7 +130,7 @@ testRun(void) "simple data - compress small in/small out buffer"); TEST_RESULT_BOOL( - bufEq(decompressed, testDecompress(gzipDecompressNew(false), compressed, 1024, 1024)), true, + bufEq(decompressed, testDecompress(gzipDecompressNewVar(decompressParamList), compressed, 1024, 1024)), true, "simple data - decompress large in/large out buffer"); TEST_RESULT_BOOL( diff --git a/test/src/module/common/ioTest.c b/test/src/module/common/ioTest.c index bba1d0595..84924d570 100644 --- a/test/src/module/common/ioTest.c +++ b/test/src/module/common/ioTest.c @@ -3,6 +3,8 @@ Test IO ***********************************************************************************************************************************/ #include +#include "common/type/json.h" + #include "common/harnessFork.h" /*********************************************************************************************************************************** @@ -111,7 +113,7 @@ ioTestFilterSizeNew(const char *type) IoTestFilterSize *driver = memNew(sizeof(IoTestFilterSize)); driver->memContext = MEM_CONTEXT_NEW(); - this = ioFilterNewP(strNew(type), driver, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult); + this = ioFilterNewP(strNew(type), driver, NULL, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult); } MEM_CONTEXT_NEW_END(); @@ -216,8 +218,13 @@ ioTestFilterMultiplyNew(const char *type, unsigned int multiplier, unsigned int driver->flushTotal = flushTotal; driver->flushChar = flushChar; + VariantList *paramList = varLstNew(); + varLstAdd(paramList, varNewStrZ(type)); + varLstAdd(paramList, varNewUInt(multiplier)); + varLstAdd(paramList, varNewUInt(flushTotal)); + this = ioFilterNewP( - strNew(type), driver, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess, + strNew(type), driver, paramList, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess, .inputSame = ioTestFilterMultiplyInputSame); } MEM_CONTEXT_NEW_END(); @@ -285,18 +292,19 @@ testRun(void) bufferOriginal = bufNewC("123", 3); TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object"); - IoFilterGroup *filterGroup = NULL; - TEST_RESULT_VOID(ioFilterGroupMove(filterGroup, memContextTop()), "move null filter group is a noop"); - TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group"); IoFilter *sizeFilter = ioSizeNew(); - TEST_RESULT_PTR(ioFilterGroupAdd(filterGroup, sizeFilter), filterGroup, " add filter to filter group"); + TEST_RESULT_PTR( + ioFilterGroupAdd(ioReadFilterGroup(bufferRead), sizeFilter), bufferRead->filterGroup, " add filter to filter group"); TEST_RESULT_VOID( - ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 3, 'X')), " add filter to filter group"); - TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioSizeNew()), " add filter to filter group"); + ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 3, 'X')), + " add filter to filter group"); + TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioSizeNew()), " add filter to filter group"); IoFilter *bufferFilter = ioBufferNew(); - TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, bufferFilter), " add filter to filter group"); - TEST_RESULT_VOID(ioReadFilterGroupSet(bufferRead, filterGroup), " add filter group to read io"); + TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), bufferFilter), " add filter to filter group"); TEST_RESULT_PTR(ioFilterMove(NULL, memContextTop()), NULL, " move NULL filter to top context"); + TEST_RESULT_STR( + strPtr(jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(bufferRead)), 0)), + "{\"buffer\":null,\"double\":[\"double\",2,3],\"size\":null}", " check filter params"); TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open"); TEST_RESULT_INT(ioReadHandle(bufferRead), -1, " handle invalid"); @@ -322,21 +330,26 @@ testRun(void) TEST_RESULT_BOOL(ioBufferRead(ioReadDriver(bufferRead), buffer, true), 0, " eof from driver"); TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 0, " read 0 bytes"); TEST_RESULT_VOID(ioReadClose(bufferRead), " close buffer read object"); + TEST_RESULT_STR( + strPtr(jsonFromVar(ioFilterGroupResultAll(ioReadFilterGroup(bufferRead)), 0)), + "{\"buffer\":null,\"double\":null,\"size\":[3,9]}", + " check filter result all"); - TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), filterGroup, " check filter group"); + TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), ioReadFilterGroup(bufferRead), " check filter group"); TEST_RESULT_UINT( - varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 0)), 3, + varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 0)), 3, " check filter result"); - TEST_RESULT_PTR(ioFilterGroupResult(filterGroup, strNew("double")), NULL, " check filter result is NULL"); + TEST_RESULT_PTR( + ioFilterGroupResult(ioReadFilterGroup(bufferRead), strNew("double")), NULL, " check filter result is NULL"); TEST_RESULT_UINT( - varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 9, + varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 1)), 9, " check filter result"); TEST_RESULT_PTR(ioFilterDriver(bufferFilter), bufferFilter->driver, " check filter driver"); TEST_RESULT_PTR(ioFilterInterface(bufferFilter), &bufferFilter->interface, " check filter interface"); TEST_RESULT_VOID(ioFilterFree(bufferFilter), " free buffer filter"); - TEST_RESULT_VOID(ioFilterGroupFree(filterGroup), " free filter group object"); + TEST_RESULT_VOID(ioFilterGroupFree(ioReadFilterGroup(bufferRead)), " free filter group object"); // Read a zero-size buffer to ensure filters are still processed even when there is no input. Some filters (e.g. encryption // and compression) will produce output even if there is no input. @@ -347,7 +360,7 @@ testRun(void) TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object"); TEST_RESULT_VOID( - ioReadFilterGroupSet(bufferRead, ioFilterGroupAdd(ioFilterGroupNew(), ioTestFilterMultiplyNew("double", 2, 5, 'Y'))), + ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 5, 'Y')), " add filter that produces output with no input"); TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open read"); TEST_RESULT_UINT(ioRead(bufferRead, buffer), 5, " read 5 chars"); @@ -437,8 +450,7 @@ testRun(void) Buffer *buffer = bufNew(0); TEST_ASSIGN(bufferWrite, ioBufferWriteNew(buffer), "create buffer write object"); - IoFilterGroup *filterGroup = NULL; - TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group"); + IoFilterGroup *filterGroup = ioWriteFilterGroup(bufferWrite); IoFilter *sizeFilter = ioSizeNew(); TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, sizeFilter), " add filter to filter group"); TEST_RESULT_VOID( @@ -447,7 +459,6 @@ testRun(void) ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("single", 1, 1, 'Y')), " add filter to filter group"); TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioTestFilterSizeNew("size2")), " add filter to filter group"); - TEST_RESULT_VOID(ioWriteFilterGroupSet(bufferWrite, filterGroup), " add filter group to write io"); TEST_RESULT_VOID(ioWriteOpen(bufferWrite), " open buffer write object"); TEST_RESULT_INT(ioWriteHandle(bufferWrite), -1, " handle invalid"); diff --git a/test/src/module/info/infoTest.c b/test/src/module/info/infoTest.c index 09e1ec02e..eb46d51cb 100644 --- a/test/src/module/info/infoTest.c +++ b/test/src/module/info/infoTest.c @@ -62,10 +62,9 @@ testRun(void) //-------------------------------------------------------------------------------------------------------------------------- StorageWrite *infoWrite = storageNewWriteNP(storageLocalWrite(), fileName); - ioWriteFilterGroupSet( - storageWriteIo(infoWrite), - ioFilterGroupAdd( - ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL))); + ioFilterGroupAdd( + ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, + BUFSTRDEF("12345678"), NULL)); storageRemoveNP(storageLocalWrite(), fileNameCopy); storagePutNP( diff --git a/test/src/module/storage/posixTest.c b/test/src/module/storage/posixTest.c index 2b6da9eeb..e82c88ece 100644 --- a/test/src/module/storage/posixTest.c +++ b/test/src/module/storage/posixTest.c @@ -604,11 +604,6 @@ testRun(void) TEST_RESULT_INT( ioReadHandle(storageReadIo(file)), ((StorageReadPosix *)file->driver)->handle, "check read handle"); TEST_RESULT_VOID(ioReadClose(storageReadIo(file)), " close file"); - - // ------------------------------------------------------------------------------------------------------------------------- - IoFilterGroup *filterGroup = ioFilterGroupNew(); - TEST_ASSIGN(file, storageNewReadP(storageTest, fileName, .filterGroup = filterGroup), "new read file with filters"); - TEST_RESULT_PTR(ioReadFilterGroup(storageReadIo(file)), filterGroup, " check filter group is set"); } // ***************************************************************************************************************************** @@ -670,13 +665,6 @@ testRun(void) TEST_RESULT_VOID(storageWritePosixClose(storageWriteDriver(file)), " close file again"); TEST_RESULT_INT(storageInfoNP(storageTest, strPath(fileName)).mode, 0700, " check path mode"); TEST_RESULT_INT(storageInfoNP(storageTest, fileName).mode, 0600, " check file mode"); - - // ------------------------------------------------------------------------------------------------------------------------- - IoFilterGroup *filterGroup = ioFilterGroupNew(); - TEST_ASSIGN(file, storageNewWriteP(storageTest, fileName, .filterGroup = filterGroup), "new write file with filters"); - TEST_RESULT_VOID(ioWriteOpen(storageWriteIo(file)), " open file"); - TEST_RESULT_VOID(ioWriteClose(storageWriteIo(file)), " close file"); - TEST_RESULT_PTR(ioWriteFilterGroup(storageWriteIo(file)), filterGroup, " check filter group is set"); } // ***************************************************************************************************************************** diff --git a/test/src/module/storage/remoteTest.c b/test/src/module/storage/remoteTest.c index 23e75f454..e9c77c2eb 100644 --- a/test/src/module/storage/remoteTest.c +++ b/test/src/module/storage/remoteTest.c @@ -1,6 +1,7 @@ /*********************************************************************************************************************************** Test Remote Storage ***********************************************************************************************************************************/ +#include "common/crypto/cipherBlock.h" #include "common/io/bufferRead.h" #include "common/io/bufferWrite.h" @@ -139,6 +140,9 @@ testRun(void) storagePutNP(storageNewWriteNP(storageTest, strNew("repo/test.txt")), contentBuf); + // Disable protocol compression in the storage object to test no compression + ((StorageRemote *)storageRemote->driver)->compressLevel = 0; + StorageRead *fileRead = NULL; ioBufferSizeSet(8193); @@ -150,8 +154,20 @@ testRun(void) storageReadRemote(storageRead(fileRead), bufNew(32), false), 0, "nothing more to read"); + TEST_ASSIGN(fileRead, storageNewReadNP(storageRemote, strNew("test.txt")), "get file"); + TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents"); + TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), " check read size"); + + // Enable protocol compression in the storage object + ((StorageRemote *)storageRemote->driver)->compressLevel = 3; + + TEST_ASSIGN( + fileRead, storageNewReadP(storageRemote, strNew("test.txt"), .compressible = true), "get file (protocol compress)"); + TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents"); + // We don't know how much protocol compression there will be exactly, but make sure this is some TEST_RESULT_BOOL( - bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "get file again"); + ((StorageReadRemote *)fileRead->driver)->protocolReadBytes < bufSize(contentBuf), true, + " check compressed read size"); TEST_ERROR( storageRemoteProtocolBlockSize(strNew("bogus")), ProtocolError, "'bogus' is not a valid block size message"); @@ -161,6 +177,7 @@ testRun(void) VariantList *paramList = varLstNew(); varLstAdd(paramList, varNewStr(strNew("missing.txt"))); varLstAdd(paramList, varNewBool(true)); + varLstAdd(paramList, varNewKv(kvNew())); TEST_RESULT_BOOL( storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true, @@ -178,6 +195,12 @@ testRun(void) varLstAdd(paramList, varNewStr(strNew("test.txt"))); varLstAdd(paramList, varNewBool(false)); + // Create filters to test filter logic + IoFilterGroup *filterGroup = ioFilterGroupNew(); + ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false)); + ioFilterGroupAdd(filterGroup, gzipDecompressNew(false)); + varLstAdd(paramList, ioFilterGroupParamAll(filterGroup)); + TEST_RESULT_BOOL( storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true, "protocol open read"); TEST_RESULT_STR( @@ -190,6 +213,21 @@ testRun(void) bufUsedSet(serverWrite, 0); ioBufferSizeSet(8192); + + // Check for error on a bogus filter + // ------------------------------------------------------------------------------------------------------------------------- + paramList = varLstNew(); + varLstAdd(paramList, varNewStr(strNew("test.txt"))); + varLstAdd(paramList, varNewBool(false)); + + // Create filters to test filter logic + filterGroup = ioFilterGroupNew(); + ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("X"), NULL)); + varLstAdd(paramList, ioFilterGroupParamAll(filterGroup)); + + TEST_ERROR( + storageRemoteProtocol( + PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), AssertError, "unable to add filter 'cipherBlock'"); } // ***************************************************************************************************************************** @@ -213,6 +251,9 @@ testRun(void) // ------------------------------------------------------------------------------------------------------------------------- ioBufferSizeSet(9999); + // Disable protocol compression in the storage object to test no compression + ((StorageRemote *)storageRemote->driver)->compressLevel = 0; + StorageWrite *write = NULL; TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test.txt")), "new write file"); @@ -225,6 +266,7 @@ testRun(void) TEST_RESULT_BOOL(storageWriteSyncPath(write), true, "path is synced"); TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file"); + TEST_RESULT_UINT(((StorageWriteRemote *)write->driver)->protocolWriteBytes, bufSize(contentBuf), " check write size"); TEST_RESULT_VOID(storageWriteRemoteClose((StorageWriteRemote *)storageWriteDriver(write)), "close file again"); TEST_RESULT_VOID(storageWriteFree(write), "free file"); @@ -232,6 +274,9 @@ testRun(void) TEST_RESULT_BOOL( bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "check file"); + // Enable protocol compression in the storage object + ((StorageRemote *)storageRemote->driver)->compressLevel = 3; + // Write the file again, but this time free it before close and make sure the .tmp file is left // ------------------------------------------------------------------------------------------------------------------------- TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test2.txt")), "new write file"); @@ -244,6 +289,14 @@ testRun(void) TEST_RESULT_UINT( storageInfoNP(storageTest, strNew("repo/test2.txt.pgbackrest.tmp")).size, 16384, "file exists and is partial"); + // Write the file again with protocol compression + // ------------------------------------------------------------------------------------------------------------------------- + TEST_ASSIGN(write, storageNewWriteP(storageRemote, strNew("test2.txt"), .compressible = true), "new write file (compress)"); + TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file"); + TEST_RESULT_BOOL( + ((StorageWriteRemote *)write->driver)->protocolWriteBytes < bufSize(contentBuf), true, + " check compressed write size"); + // Check protocol function directly (complete write) // ------------------------------------------------------------------------------------------------------------------------- ioBufferSizeSet(10); @@ -259,6 +312,7 @@ testRun(void) varLstAdd(paramList, varNewBool(true)); varLstAdd(paramList, varNewBool(true)); varLstAdd(paramList, varNewBool(true)); + varLstAdd(paramList, varNewKv(kvNew())); // Generate input (includes the input for the test below -- need a way to reset this for better testing) bufCat( @@ -299,6 +353,7 @@ testRun(void) varLstAdd(paramList, varNewBool(true)); varLstAdd(paramList, varNewBool(true)); varLstAdd(paramList, varNewBool(true)); + varLstAdd(paramList, varNewKv(kvNew())); TEST_RESULT_BOOL( storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR, paramList, server), true, "protocol open write");