mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2024-12-14 10:13:05 +02:00
Allow protocol compression when read/writing remote files.
If the file is compressible (i.e. not encrypted or already compressed) it can be marked as such in storageNewRead()/storageNewWrite(). If the file is being read from/written to a remote it will be compressed in transit using gzip. Simplify filter group handling by having the IoRead/IoWrite objects create the filter group automatically. This removes the need for a lot of NULL checking and has a negligible effect on performance since a filter group needs to be created eventually unless the source file is missing. Allow filters to be created using a VariantList so filter parameters can be passed to the remote.
This commit is contained in:
parent
62715ebf2d
commit
039e515a31
@ -216,7 +216,7 @@ command/archive/push/push.o: command/archive/push/push.c build.auto.h command/ar
|
||||
command/backup/common.o: command/backup/common.c build.auto.h command/backup/common.h common/assert.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/string.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/common.c -o command/backup/common.o
|
||||
|
||||
command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h
|
||||
command/backup/pageChecksum.o: command/backup/pageChecksum.c build.auto.h command/backup/pageChecksum.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h postgres/pageChecksum.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c command/backup/pageChecksum.c -o command/backup/pageChecksum.o
|
||||
|
||||
command/command.o: command/command.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/tls/client.h common/io/write.h common/lock.h common/log.h common/logLevel.h common/memContext.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h config/config.auto.h config/config.h config/define.auto.h config/define.h version.h
|
||||
@ -252,19 +252,19 @@ command/storage/list.o: command/storage/list.c build.auto.h common/assert.h comm
|
||||
common/compress/gzip/common.o: common/compress/gzip/common.c build.auto.h common/assert.h common/compress/gzip/common.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/memContext.h common/stackTrace.h common/type/convert.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/common.c -o common/compress/gzip/common.o
|
||||
|
||||
common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/compress/gzip/compress.o: common/compress/gzip/compress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/compress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/compress.c -o common/compress/gzip/compress.o
|
||||
|
||||
common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/compress/gzip/decompress.o: common/compress/gzip/decompress.c build.auto.h common/assert.h common/compress/gzip/common.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/compress/gzip/decompress.c -o common/compress/gzip/decompress.o
|
||||
|
||||
common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/crypto/cipherBlock.o: common/crypto/cipherBlock.c build.auto.h common/assert.h common/crypto/cipherBlock.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/cipherBlock.c -o common/crypto/cipherBlock.o
|
||||
|
||||
common/crypto/common.o: common/crypto/common.c build.auto.h common/assert.h common/crypto/common.h common/debug.h common/error.auto.h common/error.h common/log.h common/logLevel.h common/stackTrace.h common/type/convert.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/common.c -o common/crypto/common.o
|
||||
|
||||
common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/crypto/hash.o: common/crypto/hash.c build.auto.h common/assert.h common/crypto/common.h common/crypto/hash.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/crypto/hash.c -o common/crypto/hash.o
|
||||
|
||||
common/debug.o: common/debug.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/logLevel.h common/stackTrace.h common/type/convert.h
|
||||
@ -297,16 +297,16 @@ common/io/bufferRead.o: common/io/bufferRead.c build.auto.h common/assert.h comm
|
||||
common/io/bufferWrite.o: common/io/bufferWrite.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/bufferWrite.h common/io/filter/filter.h common/io/filter/group.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/bufferWrite.c -o common/io/bufferWrite.o
|
||||
|
||||
common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/io/filter/buffer.o: common/io/filter/buffer.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/buffer.c -o common/io/filter/buffer.o
|
||||
|
||||
common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/io/filter/filter.o: common/io/filter/filter.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/filter.c -o common/io/filter/filter.o
|
||||
|
||||
common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/io/filter/group.o: common/io/filter/group.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/buffer.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/group.h common/io/io.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/list.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/group.c -o common/io/filter/group.o
|
||||
|
||||
common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
common/io/filter/size.o: common/io/filter/size.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/filter.intern.h common/io/filter/size.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c common/io/filter/size.c -o common/io/filter/size.o
|
||||
|
||||
common/io/handleRead.o: common/io/handleRead.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/handleRead.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h
|
||||
@ -510,16 +510,16 @@ storage/posix/write.o: storage/posix/write.c build.auto.h common/assert.h common
|
||||
storage/read.o: storage/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/variant.h common/type/variantList.h storage/read.h storage/read.intern.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/read.c -o storage/read.o
|
||||
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
storage/remote/protocol.o: storage/remote/protocol.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/io.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/memContext.h common/regExp.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/server.h storage/helper.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/protocol.c -o storage/remote/protocol.o
|
||||
|
||||
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
storage/remote/read.o: storage/remote/read.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/read.c -o storage/remote/read.o
|
||||
|
||||
storage/remote/storage.o: storage/remote/storage.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/read.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/storage.c -o storage/remote/storage.o
|
||||
|
||||
storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
storage/remote/write.o: storage/remote/write.c build.auto.h common/assert.h common/compress/gzip/compress.h common/compress/gzip/decompress.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h protocol/client.h protocol/command.h protocol/server.h storage/info.h storage/read.h storage/read.intern.h storage/remote/protocol.h storage/remote/storage.h storage/remote/storage.intern.h storage/remote/write.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
$(CC) $(CPPFLAGS) $(CFLAGS) $(CMAKE) -c storage/remote/write.c -o storage/remote/write.o
|
||||
|
||||
storage/s3/read.o: storage/s3/read.c build.auto.h common/assert.h common/debug.h common/error.auto.h common/error.h common/io/filter/filter.h common/io/filter/group.h common/io/http/client.h common/io/http/header.h common/io/http/query.h common/io/read.h common/io/read.intern.h common/io/write.h common/io/write.intern.h common/log.h common/logLevel.h common/macro.h common/memContext.h common/object.h common/stackTrace.h common/time.h common/type/buffer.h common/type/convert.h common/type/keyValue.h common/type/string.h common/type/stringList.h common/type/variant.h common/type/variantList.h storage/info.h storage/read.h storage/read.intern.h storage/s3/read.h storage/s3/storage.h storage/s3/storage.intern.h storage/storage.h storage/storage.intern.h storage/write.h storage/write.intern.h version.h
|
||||
|
@ -132,6 +132,9 @@ archiveGetFile(
|
||||
// By default result indicates WAL segment not found
|
||||
int result = 1;
|
||||
|
||||
// Is the file compressible during the copy?
|
||||
bool compressible = true;
|
||||
|
||||
// Test for stop file
|
||||
lockStopTest();
|
||||
|
||||
@ -146,26 +149,27 @@ archiveGetFile(
|
||||
storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable,
|
||||
.noAtomic = !durable);
|
||||
|
||||
// Add filters
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
|
||||
// If there is a cipher then add the decrypt filter
|
||||
if (cipherType != cipherTypeNone)
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
filterGroup, cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(archiveGetCheckResult.cipherPass), NULL));
|
||||
ioWriteFilterGroup(storageWriteIo(destination)), cipherBlockNew(cipherModeDecrypt, cipherType,
|
||||
BUFSTR(archiveGetCheckResult.cipherPass), NULL));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// If file is compressed then add the decompression filter
|
||||
if (strEndsWithZ(archiveGetCheckResult.archiveFileActual, "." GZIP_EXT))
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
|
||||
|
||||
ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup);
|
||||
{
|
||||
ioFilterGroupAdd(ioWriteFilterGroup(storageWriteIo(destination)), gzipDecompressNew(false));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// Copy the file
|
||||
storageCopyNP(
|
||||
storageNewReadNP(
|
||||
storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual))),
|
||||
storageNewReadP(
|
||||
storageRepo(), strNewFmt("%s/%s", STORAGE_REPO_ARCHIVE, strPtr(archiveGetCheckResult.archiveFileActual)),
|
||||
.compressible = compressible),
|
||||
destination);
|
||||
|
||||
// The WAL file was found
|
||||
|
@ -74,8 +74,7 @@ archivePushFile(
|
||||
{
|
||||
// Generate a sha1 checksum for the wal segment. ??? Probably need a function in storage for this.
|
||||
IoRead *read = storageReadIo(storageNewReadNP(storageLocal(), walSource));
|
||||
IoFilterGroup *filterGroup = ioFilterGroupAdd(ioFilterGroupNew(), cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
ioReadFilterGroupSet(read, filterGroup);
|
||||
ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
|
||||
Buffer *buffer = bufNew(ioBufferSize());
|
||||
ioReadOpen(read);
|
||||
@ -88,7 +87,7 @@ archivePushFile(
|
||||
while (!ioReadEof(read));
|
||||
|
||||
ioReadClose(read);
|
||||
const String *walSegmentChecksum = varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR));
|
||||
const String *walSegmentChecksum = varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR));
|
||||
|
||||
// If the wal segment already exists in the repo then compare checksums
|
||||
walSegmentFile = walSegmentFind(storageRepo(), archiveId, archiveFile);
|
||||
@ -119,27 +118,32 @@ archivePushFile(
|
||||
{
|
||||
StorageRead *source = storageNewReadNP(storageLocal(), walSource);
|
||||
|
||||
// Add filters
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
// Is the file compressible during the copy?
|
||||
bool compressible = true;
|
||||
|
||||
// If the file will be compressed then add compression filter
|
||||
if (isSegment && compress)
|
||||
{
|
||||
strCat(archiveDestination, "." GZIP_EXT);
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew(compressLevel, false));
|
||||
ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(source)), gzipCompressNew(compressLevel, false));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// If there is a cipher then add the encrypt filter
|
||||
if (cipherType != cipherTypeNone)
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
|
||||
|
||||
ioReadFilterGroupSet(storageReadIo(source), filterGroup);
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
ioReadFilterGroup(storageReadIo(source)),
|
||||
cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// Copy the file
|
||||
storageCopyNP(
|
||||
source,
|
||||
storageNewWriteNP(
|
||||
storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination))));
|
||||
storageNewWriteP(
|
||||
storageRepoWrite(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strPtr(archiveId), strPtr(archiveDestination)),
|
||||
.compressible = compressible));
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
@ -229,7 +229,7 @@ pageChecksumNew(unsigned int segmentNo, unsigned int segmentPageTotal, size_t pa
|
||||
driver->valid = true;
|
||||
driver->align = true;
|
||||
|
||||
this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, .in = pageChecksumProcess, .result = pageChecksumResult);
|
||||
this = ioFilterNewP(PAGE_CHECKSUM_FILTER_TYPE_STR, driver, NULL, .in = pageChecksumProcess, .result = pageChecksumResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -55,7 +55,8 @@ restoreFile(
|
||||
// Was the file copied?
|
||||
bool result = true;
|
||||
|
||||
// Create destination file. We may not use this but it makes sense to only create it in one place if we do.
|
||||
// Is the file compressible during the copy?
|
||||
bool compressible = true;
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
@ -81,13 +82,12 @@ restoreFile(
|
||||
if (info.size == pgFileSize)
|
||||
{
|
||||
// Generate checksum for the file if size is not zero
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
IoRead *read = NULL;
|
||||
|
||||
if (info.size != 0)
|
||||
{
|
||||
IoRead *read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile));
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
ioReadFilterGroupSet(read, filterGroup);
|
||||
read = storageReadIo(storageNewReadNP(storagePgWrite(), pgFile));
|
||||
ioFilterGroupAdd(ioReadFilterGroup(read), cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
|
||||
Buffer *buffer = bufNew(ioBufferSize());
|
||||
ioReadOpen(read);
|
||||
@ -104,7 +104,8 @@ restoreFile(
|
||||
|
||||
// If size and checksum are equal then no need to copy the file
|
||||
if (pgFileSize == 0 ||
|
||||
strEq(pgFileChecksum, varStr(ioFilterGroupResult(filterGroup, CRYPTO_HASH_FILTER_TYPE_STR))))
|
||||
strEq(
|
||||
pgFileChecksum, varStr(ioFilterGroupResult(ioReadFilterGroup(read), CRYPTO_HASH_FILTER_TYPE_STR))))
|
||||
{
|
||||
// Even if hash/size are the same set the time back to backup time. This helps with unit testing, but
|
||||
// also presents a pristine version of the database after restore.
|
||||
@ -153,15 +154,21 @@ restoreFile(
|
||||
// Else perform the copy
|
||||
else
|
||||
{
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(pgFileWrite));
|
||||
|
||||
// Add decryption filter
|
||||
if (cipherPass != NULL)
|
||||
{
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeDecrypt, cipherTypeAes256Cbc, BUFSTR(cipherPass), NULL));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// Add decompression filter
|
||||
if (repoFileCompressed)
|
||||
{
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// Add sha1 filter
|
||||
ioFilterGroupAdd(filterGroup, cryptoHashNew(HASH_TYPE_SHA1_STR));
|
||||
@ -169,15 +176,14 @@ restoreFile(
|
||||
// Add size filter
|
||||
ioFilterGroupAdd(filterGroup, ioSizeNew());
|
||||
|
||||
ioWriteFilterGroupSet(storageWriteIo(pgFileWrite), filterGroup);
|
||||
|
||||
// Copy file
|
||||
storageCopyNP(
|
||||
storageNewReadNP(
|
||||
storageNewReadP(
|
||||
storageRepo(),
|
||||
strNewFmt(
|
||||
STORAGE_REPO_BACKUP "/%s/%s%s", strPtr(repoFileReference), strPtr(repoFile),
|
||||
repoFileCompressed ? "." GZIP_EXT : "")),
|
||||
repoFileCompressed ? "." GZIP_EXT : ""),
|
||||
.compressible = compressible),
|
||||
pgFileWrite);
|
||||
|
||||
// Validate checksum
|
||||
|
@ -17,8 +17,7 @@ Gzip Compress
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress"
|
||||
STRING_STATIC(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE);
|
||||
STRING_EXTERN(GZIP_COMPRESS_FILTER_TYPE_STR, GZIP_COMPRESS_FILTER_TYPE);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Object type
|
||||
@ -185,12 +184,23 @@ gzipCompressNew(int level, bool raw)
|
||||
// Set free callback to ensure gzip context is freed
|
||||
memContextCallbackSet(driver->memContext, gzipCompressFreeResource, driver);
|
||||
|
||||
// Create param list
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewInt(level));
|
||||
varLstAdd(paramList, varNewBool(raw));
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(
|
||||
GZIP_COMPRESS_FILTER_TYPE_STR, driver, .done = gzipCompressDone, .inOut = gzipCompressProcess,
|
||||
GZIP_COMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipCompressDone, .inOut = gzipCompressProcess,
|
||||
.inputSame = gzipCompressInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
||||
|
||||
IoFilter *
|
||||
gzipCompressNewVar(const VariantList *paramList)
|
||||
{
|
||||
return gzipCompressNew(varIntForce(varLstGet(paramList, 0)), varBool(varLstGet(paramList, 1)));
|
||||
}
|
||||
|
@ -8,9 +8,16 @@ Compress IO using the gzip format.
|
||||
|
||||
#include "common/io/filter/filter.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define GZIP_COMPRESS_FILTER_TYPE "gzipCompress"
|
||||
STRING_DECLARE(GZIP_COMPRESS_FILTER_TYPE_STR);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *gzipCompressNew(int level, bool raw);
|
||||
IoFilter *gzipCompressNewVar(const VariantList *paramList);
|
||||
|
||||
#endif
|
||||
|
@ -17,8 +17,7 @@ Gzip Decompress
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress"
|
||||
STRING_STATIC(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE);
|
||||
STRING_EXTERN(GZIP_DECOMPRESS_FILTER_TYPE_STR, GZIP_DECOMPRESS_FILTER_TYPE);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Object type
|
||||
@ -162,12 +161,22 @@ gzipDecompressNew(bool raw)
|
||||
// Set free callback to ensure gzip context is freed
|
||||
memContextCallbackSet(driver->memContext, gzipDecompressFreeResource, driver);
|
||||
|
||||
// Create param list
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewBool(raw));
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(
|
||||
GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, .done = gzipDecompressDone, .inOut = gzipDecompressProcess,
|
||||
GZIP_DECOMPRESS_FILTER_TYPE_STR, driver, paramList, .done = gzipDecompressDone, .inOut = gzipDecompressProcess,
|
||||
.inputSame = gzipDecompressInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
}
|
||||
|
||||
IoFilter *
|
||||
gzipDecompressNewVar(const VariantList *paramList)
|
||||
{
|
||||
return gzipDecompressNew(varBool(varLstGet(paramList, 0)));
|
||||
}
|
||||
|
@ -8,9 +8,16 @@ Decompress IO from the gzip format.
|
||||
|
||||
#include "common/io/filter/filter.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Filter type constant
|
||||
***********************************************************************************************************************************/
|
||||
#define GZIP_DECOMPRESS_FILTER_TYPE "gzipDecompress"
|
||||
STRING_DECLARE(GZIP_DECOMPRESS_FILTER_TYPE_STR);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *gzipDecompressNew(bool raw);
|
||||
IoFilter *gzipDecompressNewVar(const VariantList *paramList);
|
||||
|
||||
#endif
|
||||
|
@ -437,7 +437,7 @@ cipherBlockNew(CipherMode mode, CipherType cipherType, const Buffer *pass, const
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(
|
||||
CIPHER_BLOCK_FILTER_TYPE_STR, driver, .done = cipherBlockDone, .inOut = cipherBlockProcess,
|
||||
CIPHER_BLOCK_FILTER_TYPE_STR, driver, NULL, .done = cipherBlockDone, .inOut = cipherBlockProcess,
|
||||
.inputSame = cipherBlockInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
@ -164,7 +164,7 @@ cryptoHashNew(const String *type)
|
||||
cryptoError(!EVP_DigestInit_ex(driver->hashContext, driver->hashType, NULL), "unable to initialize hash context");
|
||||
|
||||
// Create filter interface
|
||||
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, .in = cryptoHashProcess, .result = cryptoHashResult);
|
||||
this = ioFilterNewP(CRYPTO_HASH_FILTER_TYPE_STR, driver, NULL, .in = cryptoHashProcess, .result = cryptoHashResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -121,7 +121,7 @@ ioBufferNew(void)
|
||||
IoBuffer *driver = memNew(sizeof(IoBuffer));
|
||||
driver->memContext = memContextCurrent();
|
||||
|
||||
this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame);
|
||||
this = ioFilterNewP(BUFFER_FILTER_TYPE_STR, driver, NULL, .inOut = ioBufferProcess, .inputSame = ioBufferInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -17,6 +17,7 @@ struct IoFilter
|
||||
MemContext *memContext; // Mem context of filter
|
||||
const String *type; // Filter type
|
||||
void *driver; // Filter driver
|
||||
const VariantList *paramList; // Filter parameters
|
||||
IoFilterInterface interface; // Filter interface
|
||||
|
||||
bool flushing; // Has the filter started flushing?
|
||||
@ -30,11 +31,12 @@ New object
|
||||
Allocations will be in the memory context of the caller.
|
||||
***********************************************************************************************************************************/
|
||||
IoFilter *
|
||||
ioFilterNew(const String *type, void *driver, IoFilterInterface interface)
|
||||
ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface interface)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STRING, type);
|
||||
FUNCTION_LOG_PARAM_P(VOID, driver);
|
||||
FUNCTION_LOG_PARAM(VARIANT_LIST, paramList);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_INTERFACE, interface);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
@ -53,6 +55,7 @@ ioFilterNew(const String *type, void *driver, IoFilterInterface interface)
|
||||
this->memContext = memContextCurrent();
|
||||
this->type = type;
|
||||
this->driver = driver;
|
||||
this->paramList = paramList;
|
||||
this->interface = interface;
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_FILTER, this);
|
||||
@ -202,7 +205,7 @@ ioFilterInterface(const IoFilter *this)
|
||||
/***********************************************************************************************************************************
|
||||
Does filter produce output?
|
||||
|
||||
All InOut filters produce output.
|
||||
All In filters produce output.
|
||||
***********************************************************************************************************************************/
|
||||
bool
|
||||
ioFilterOutput(const IoFilter *this)
|
||||
@ -216,6 +219,21 @@ ioFilterOutput(const IoFilter *this)
|
||||
FUNCTION_TEST_RETURN(this->interface.inOut != NULL);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
List of filter parameters
|
||||
***********************************************************************************************************************************/
|
||||
const VariantList *
|
||||
ioFilterParamList(const IoFilter *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(IO_FILTER, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_TEST_RETURN(this->paramList);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get filter result
|
||||
***********************************************************************************************************************************/
|
||||
|
@ -19,6 +19,7 @@ Each filter has a type that allows it to be identified in the filter list.
|
||||
#define COMMON_IO_FILTER_FILTER_INTERN_H
|
||||
|
||||
#include "common/io/filter/filter.h"
|
||||
#include "common/type/variantList.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
@ -48,10 +49,10 @@ typedef struct IoFilterInterface
|
||||
Variant *(*result)(void *driver);
|
||||
} IoFilterInterface;
|
||||
|
||||
#define ioFilterNewP(type, driver, ...) \
|
||||
ioFilterNew(type, driver, (IoFilterInterface){__VA_ARGS__})
|
||||
#define ioFilterNewP(type, driver, paramList, ...) \
|
||||
ioFilterNew(type, driver, paramList, (IoFilterInterface){__VA_ARGS__})
|
||||
|
||||
IoFilter *ioFilterNew(const String *type, void *driver, IoFilterInterface);
|
||||
IoFilter *ioFilterNew(const String *type, void *driver, VariantList *paramList, IoFilterInterface);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Functions
|
||||
@ -68,6 +69,7 @@ void *ioFilterDriver(IoFilter *this);
|
||||
bool ioFilterInputSame(const IoFilter *this);
|
||||
const IoFilterInterface *ioFilterInterface(const IoFilter *this);
|
||||
bool ioFilterOutput(const IoFilter *this);
|
||||
const VariantList *ioFilterParamList(const IoFilter *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Macros for function logging
|
||||
|
@ -106,7 +106,7 @@ ioFilterGroupAdd(IoFilterGroup *this, IoFilter *filter)
|
||||
Get a filter
|
||||
***********************************************************************************************************************************/
|
||||
static IoFilterData *
|
||||
ioFilterGroupGet(IoFilterGroup *this, unsigned int filterIdx)
|
||||
ioFilterGroupGet(const IoFilterGroup *this, unsigned int filterIdx)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
|
||||
@ -136,13 +136,16 @@ ioFilterGroupOpen(IoFilterGroup *this)
|
||||
{
|
||||
// If the last filter is not an output filter then add a filter to buffer/copy data. Input filters won't copy to an output
|
||||
// buffer so we need some way to get the data to the output buffer.
|
||||
if (lstSize(this->filterList) == 0 || !ioFilterOutput((ioFilterGroupGet(this, lstSize(this->filterList) - 1))->filter))
|
||||
if (ioFilterGroupSize(this) == 0 ||
|
||||
!ioFilterOutput((ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->filter))
|
||||
{
|
||||
ioFilterGroupAdd(this, ioBufferNew());
|
||||
}
|
||||
|
||||
// Create filter input/output buffers. Input filters do not get an output buffer since they don't produce output.
|
||||
Buffer **lastOutputBuffer = NULL;
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
|
||||
|
||||
@ -163,7 +166,7 @@ ioFilterGroupOpen(IoFilterGroup *this)
|
||||
|
||||
// If this is not the last output filter then create a new output buffer for it. The output buffer for the last filter
|
||||
// will be provided to the process function.
|
||||
if (ioFilterOutput(filterData->filter) && filterIdx < lstSize(this->filterList) - 1)
|
||||
if (ioFilterOutput(filterData->filter) && filterIdx < ioFilterGroupSize(this) - 1)
|
||||
{
|
||||
filterData->output = bufNew(ioBufferSize());
|
||||
lastOutputBuffer = &filterData->output;
|
||||
@ -207,7 +210,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
|
||||
// Assign input and output buffers
|
||||
this->input = input;
|
||||
(ioFilterGroupGet(this, lstSize(this->filterList) - 1))->output = output;
|
||||
(ioFilterGroupGet(this, ioFilterGroupSize(this) - 1))->output = output;
|
||||
|
||||
//
|
||||
do
|
||||
@ -220,7 +223,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
if (this->inputSame)
|
||||
{
|
||||
this->inputSame = false;
|
||||
filterIdx = lstSize(this->filterList);
|
||||
filterIdx = ioFilterGroupSize(this);
|
||||
|
||||
do
|
||||
{
|
||||
@ -242,7 +245,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
|
||||
// Process forward from the filter that has input to process. This may be a filter that needs the same input or it may be
|
||||
// new input for the first filter.
|
||||
for (; filterIdx < lstSize(this->filterList); filterIdx++)
|
||||
for (; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
|
||||
|
||||
@ -288,7 +291,7 @@ ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output)
|
||||
this->done = true;
|
||||
this->inputSame = false;
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
|
||||
|
||||
@ -321,7 +324,7 @@ ioFilterGroupClose(IoFilterGroup *this)
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->opened && !this->closed);
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < lstSize(this->filterList); filterIdx++)
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilterData *filterData = ioFilterGroupGet(this, filterIdx);
|
||||
const Variant *filterResult = ioFilterResult(filterData->filter);
|
||||
@ -350,25 +353,6 @@ ioFilterGroupClose(IoFilterGroup *this)
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Move the object to a new context
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *
|
||||
ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_TEST_PARAM(MEM_CONTEXT, parentNew);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(parentNew != NULL);
|
||||
|
||||
if (this != NULL)
|
||||
memContextMove(this->memContext, parentNew);
|
||||
|
||||
FUNCTION_TEST_RETURN(this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Is the filter group done processing?
|
||||
***********************************************************************************************************************************/
|
||||
@ -403,6 +387,37 @@ ioFilterGroupInputSame(const IoFilterGroup *this)
|
||||
FUNCTION_TEST_RETURN(this->inputSame);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get all filters and parameters so they can be passed to a remote
|
||||
***********************************************************************************************************************************/
|
||||
Variant *
|
||||
ioFilterGroupParamAll(const IoFilterGroup *this)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(!this->opened);
|
||||
ASSERT(this->filterList != NULL);
|
||||
|
||||
KeyValue *result = kvNew();
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
for (unsigned int filterIdx = 0; filterIdx < ioFilterGroupSize(this); filterIdx++)
|
||||
{
|
||||
IoFilter *filter = ioFilterGroupGet(this, filterIdx)->filter;
|
||||
const VariantList *paramList = ioFilterParamList(filter);
|
||||
|
||||
kvAdd(result, VARSTR(ioFilterType(filter)), paramList ? varNewVarLst(paramList) : NULL);
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(VARIANT, varNewKv(result));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get filter results
|
||||
***********************************************************************************************************************************/
|
||||
@ -429,11 +444,50 @@ ioFilterGroupResult(const IoFilterGroup *this, const String *filterType)
|
||||
FUNCTION_LOG_RETURN_CONST(VARIANT, result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get all filter results
|
||||
***********************************************************************************************************************************/
|
||||
const Variant *
|
||||
ioFilterGroupResultAll(const IoFilterGroup *this)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(this->closed);
|
||||
|
||||
FUNCTION_LOG_RETURN_CONST(VARIANT, varNewKv(this->filterResult));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Return total number of filters
|
||||
***********************************************************************************************************************************/
|
||||
unsigned int
|
||||
ioFilterGroupSize(const IoFilterGroup *this)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, this);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
FUNCTION_TEST_RETURN(lstSize(this->filterList));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Render as string for logging
|
||||
***********************************************************************************************************************************/
|
||||
String *
|
||||
ioFilterGroupToLog(const IoFilterGroup *this)
|
||||
{
|
||||
return strNewFmt("{inputSame: %s, done: %s}", cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done));
|
||||
return strNewFmt(
|
||||
"{inputSame: %s, done: %s"
|
||||
#ifdef DEBUG
|
||||
", opened %s, flushing %s, closed %s"
|
||||
#endif
|
||||
"}",
|
||||
cvtBoolToConstZ(this->inputSame), cvtBoolToConstZ(this->done)
|
||||
#ifdef DEBUG
|
||||
, cvtBoolToConstZ(this->opened), cvtBoolToConstZ(this->flushing), cvtBoolToConstZ(this->closed)
|
||||
#endif
|
||||
);
|
||||
}
|
||||
|
@ -34,14 +34,15 @@ void ioFilterGroupOpen(IoFilterGroup *this);
|
||||
void ioFilterGroupProcess(IoFilterGroup *this, const Buffer *input, Buffer *output);
|
||||
void ioFilterGroupClose(IoFilterGroup *this);
|
||||
|
||||
IoFilterGroup *ioFilterGroupMove(IoFilterGroup *this, MemContext *parentNew);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Getters
|
||||
***********************************************************************************************************************************/
|
||||
bool ioFilterGroupDone(const IoFilterGroup *this);
|
||||
bool ioFilterGroupInputSame(const IoFilterGroup *this);
|
||||
Variant *ioFilterGroupParamAll(const IoFilterGroup *this);
|
||||
const Variant *ioFilterGroupResult(const IoFilterGroup *this, const String *filterType);
|
||||
const Variant *ioFilterGroupResultAll(const IoFilterGroup *this);
|
||||
unsigned int ioFilterGroupSize(const IoFilterGroup *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Destructor
|
||||
|
@ -94,7 +94,7 @@ ioSizeNew(void)
|
||||
IoSize *driver = memNew(sizeof(IoSize));
|
||||
driver->memContext = memContextCurrent();
|
||||
|
||||
this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, .in = ioSizeProcess, .result = ioSizeResult);
|
||||
this = ioFilterNewP(SIZE_FILTER_TYPE_STR, driver, NULL, .in = ioSizeProcess, .result = ioSizeResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
|
@ -56,6 +56,7 @@ ioReadNew(void *driver, IoReadInterface interface)
|
||||
this->memContext = memContextCurrent();
|
||||
this->driver = driver;
|
||||
this->interface = interface;
|
||||
this->filterGroup = ioFilterGroupNew();
|
||||
this->input = bufNew(ioBufferSize());
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
@ -75,25 +76,14 @@ ioReadOpen(IoRead *this)
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(!this->opened && !this->closed);
|
||||
ASSERT(ioFilterGroupSize(this->filterGroup) == 0 || !ioReadBlock(this));
|
||||
|
||||
// Open if the driver has an open function
|
||||
bool result = this->interface.open != NULL ? this->interface.open(this->driver) : true;
|
||||
|
||||
// Only open the filter group if the read was opened
|
||||
if (result)
|
||||
{
|
||||
// If no filter group exists create one to do buffering
|
||||
if (this->filterGroup == NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
{
|
||||
this->filterGroup = ioFilterGroupNew();
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
|
||||
ioFilterGroupOpen(this->filterGroup);
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
this->opened = result;
|
||||
@ -365,9 +355,7 @@ ioReadEof(const IoRead *this)
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get/set filters
|
||||
|
||||
Filters must be set before open and cannot be reset.
|
||||
Get filter group if filters need to be added
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *
|
||||
ioReadFilterGroup(const IoRead *this)
|
||||
@ -381,25 +369,6 @@ ioReadFilterGroup(const IoRead *this)
|
||||
FUNCTION_TEST_RETURN(this->filterGroup);
|
||||
}
|
||||
|
||||
IoRead *
|
||||
ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_READ, this);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(filterGroup != NULL);
|
||||
ASSERT(this->filterGroup == NULL);
|
||||
ASSERT(!this->opened && !this->closed);
|
||||
ASSERT(!ioReadBlock(this));
|
||||
|
||||
this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext);
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_READ, this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Handle (file descriptor) for the read object
|
||||
|
||||
|
@ -34,7 +34,6 @@ Getters/Setters
|
||||
bool ioReadBlock(const IoRead *this);
|
||||
bool ioReadEof(const IoRead *this);
|
||||
IoFilterGroup *ioReadFilterGroup(const IoRead *this);
|
||||
IoRead *ioReadFilterGroupSet(IoRead *this, IoFilterGroup *filterGroup);
|
||||
int ioReadHandle(const IoRead *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
|
@ -24,7 +24,7 @@ struct IoWrite
|
||||
Buffer *output; // Output buffer
|
||||
|
||||
#ifdef DEBUG
|
||||
bool filterGroupSet; // Was an IoFilterGroup set?
|
||||
bool filterGroupSet; // Were filters set?
|
||||
bool opened; // Has the io been opened?
|
||||
bool closed; // Has the io been closed?
|
||||
#endif
|
||||
@ -54,6 +54,7 @@ ioWriteNew(void *driver, IoWriteInterface interface)
|
||||
this->memContext = memContextCurrent();
|
||||
this->driver = driver;
|
||||
this->interface = interface;
|
||||
this->filterGroup = ioFilterGroupNew();
|
||||
this->output = bufNew(ioBufferSize());
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
@ -77,16 +78,12 @@ ioWriteOpen(IoWrite *this)
|
||||
if (this->interface.open != NULL)
|
||||
this->interface.open(this->driver);
|
||||
|
||||
// If no filter group exists create one to do buffering
|
||||
if (this->filterGroup == NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(this->memContext)
|
||||
{
|
||||
this->filterGroup = ioFilterGroupNew();
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
// Track whether filters were added to prevent flush() from being called later since flush() won't work with most filters
|
||||
#ifdef DEBUG
|
||||
this->filterGroupSet = ioFilterGroupSize(this->filterGroup) > 0;
|
||||
#endif
|
||||
|
||||
// Open the filter group
|
||||
ioFilterGroupOpen(this->filterGroup);
|
||||
|
||||
#ifdef DEBUG
|
||||
@ -287,29 +284,6 @@ ioWriteFilterGroup(const IoWrite *this)
|
||||
FUNCTION_TEST_RETURN(this->filterGroup);
|
||||
}
|
||||
|
||||
IoWrite *
|
||||
ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(IO_WRITE, this);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, filterGroup);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
ASSERT(filterGroup != NULL);
|
||||
ASSERT(this->filterGroup == NULL);
|
||||
ASSERT(!this->opened && !this->closed);
|
||||
|
||||
// Track whether a filter group was set to prevent flush() from being called later
|
||||
#ifdef DEBUG
|
||||
this->filterGroupSet = true;
|
||||
#endif
|
||||
|
||||
this->filterGroup = ioFilterGroupMove(filterGroup, this->memContext);
|
||||
|
||||
FUNCTION_LOG_RETURN(IO_WRITE, this);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Handle (file descriptor) for the write object
|
||||
|
||||
|
@ -34,7 +34,6 @@ void ioWriteClose(IoWrite *this);
|
||||
Getters/Setters
|
||||
***********************************************************************************************************************************/
|
||||
IoFilterGroup *ioWriteFilterGroup(const IoWrite *this);
|
||||
IoWrite *ioWriteFilterGroupSet(IoWrite *this, IoFilterGroup *filterGroup);
|
||||
int ioWriteHandle(const IoWrite *this);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
|
@ -142,13 +142,13 @@ infoLoad(Info *this, const Storage *storage, const String *fileName, bool copyFi
|
||||
const String *fileNameExt = copyFile ? strNewFmt("%s" INFO_COPY_EXT, strPtr(fileName)) : fileName;
|
||||
|
||||
// Attempt to load the file
|
||||
StorageRead *infoRead = storageNewReadNP(storage, fileNameExt);
|
||||
StorageRead *infoRead = storageNewReadP(storage, fileNameExt, .compressible = cipherType == cipherTypeNone);
|
||||
|
||||
if (cipherType != cipherTypeNone)
|
||||
{
|
||||
ioReadFilterGroupSet(
|
||||
storageReadIo(infoRead),
|
||||
ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL)));
|
||||
ioFilterGroupAdd(
|
||||
ioReadFilterGroup(storageReadIo(infoRead)),
|
||||
cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL));
|
||||
}
|
||||
|
||||
// Load and parse the info file
|
||||
@ -334,13 +334,12 @@ infoSave(
|
||||
iniSet(ini, INFO_SECTION_BACKREST_STR, INFO_KEY_CHECKSUM_STR, jsonFromStr(infoHash(ini)));
|
||||
|
||||
// Save info file
|
||||
IoWrite *infoWrite = storageWriteIo(storageNewWriteNP(storage, fileName));
|
||||
IoWrite *infoWrite = storageWriteIo(storageNewWriteP(storage, fileName, .compressible = cipherType == cipherTypeNone));
|
||||
|
||||
if (cipherType != cipherTypeNone)
|
||||
{
|
||||
ioWriteFilterGroupSet(
|
||||
infoWrite,
|
||||
ioFilterGroupAdd(ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL)));
|
||||
ioFilterGroupAdd(
|
||||
ioWriteFilterGroup(infoWrite), cipherBlockNew(cipherModeEncrypt, cipherType, BUFSTR(cipherPass), NULL));
|
||||
}
|
||||
|
||||
iniSave(ini, infoWrite);
|
||||
|
@ -278,7 +278,7 @@ storageRepoGet(const String *type, bool write)
|
||||
{
|
||||
result = storageRemoteNew(
|
||||
STORAGE_MODE_FILE_DEFAULT, STORAGE_MODE_PATH_DEFAULT, write, storageRepoPathExpression,
|
||||
protocolRemoteGet(protocolStorageTypeRepo));
|
||||
protocolRemoteGet(protocolStorageTypeRepo), cfgOptionUInt(cfgOptCompressLevelNetwork));
|
||||
}
|
||||
// Use CIFS storage
|
||||
else if (strEqZ(type, STORAGE_TYPE_CIFS))
|
||||
|
@ -394,7 +394,7 @@ storagePosixMove(THIS_VOID, StorageRead *source, StorageWrite *destination)
|
||||
New file read object
|
||||
***********************************************************************************************************************************/
|
||||
static StorageRead *
|
||||
storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
|
||||
{
|
||||
THIS(StoragePosix);
|
||||
|
||||
@ -402,6 +402,7 @@ storagePosixNewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
FUNCTION_LOG_PARAM(STORAGE_POSIX, this);
|
||||
FUNCTION_LOG_PARAM(STRING, file);
|
||||
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
|
||||
(void)compressible;
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
@ -416,7 +417,7 @@ New file write object
|
||||
static StorageWrite *
|
||||
storagePosixNewWrite(
|
||||
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic)
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
|
||||
{
|
||||
THIS(StoragePosix);
|
||||
|
||||
@ -432,6 +433,7 @@ storagePosixNewWrite(
|
||||
FUNCTION_LOG_PARAM(BOOL, syncFile);
|
||||
FUNCTION_LOG_PARAM(BOOL, syncPath);
|
||||
FUNCTION_LOG_PARAM(BOOL, atomic);
|
||||
(void)compressible;
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
@ -14,6 +14,8 @@ typedef struct StorageReadInterface
|
||||
{
|
||||
const String * type;
|
||||
const String * name;
|
||||
bool compressible; // Is this file compressible?
|
||||
unsigned int compressLevel; // Level to use for compression
|
||||
bool ignoreMissing;
|
||||
IoReadInterface ioInterface;
|
||||
} StorageReadInterface;
|
||||
|
@ -3,6 +3,8 @@ Remote Storage Protocol Handler
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include "common/compress/gzip/compress.h"
|
||||
#include "common/compress/gzip/decompress.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/io/io.h"
|
||||
#include "common/log.h"
|
||||
@ -40,6 +42,38 @@ static struct
|
||||
RegExp *blockRegExp; // Regular expression to check block messages
|
||||
} storageRemoteProtocolLocal;
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Set filter group based on passed filters
|
||||
***********************************************************************************************************************************/
|
||||
static void
|
||||
storageRemoteFilterGroup(IoFilterGroup *filterGroup, const Variant *filterList)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
FUNCTION_TEST_PARAM(IO_FILTER_GROUP, filterGroup);
|
||||
FUNCTION_TEST_PARAM(VARIANT, filterList);
|
||||
FUNCTION_TEST_END();
|
||||
|
||||
ASSERT(filterGroup != NULL);
|
||||
ASSERT(filterList != NULL);
|
||||
|
||||
const VariantList *filterKeyList = kvKeyList(varKv(filterList));
|
||||
|
||||
for (unsigned int filterIdx = 0; filterIdx < varLstSize(filterKeyList); filterIdx++)
|
||||
{
|
||||
const String *filterKey = varStr(varLstGet(filterKeyList, filterIdx));
|
||||
const VariantList *filterParam = varVarLst(kvGet(varKv(filterList), varLstGet(filterKeyList, filterIdx)));
|
||||
|
||||
if (strEq(filterKey, GZIP_COMPRESS_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNewVar(filterParam));
|
||||
else if (strEq(filterKey, GZIP_DECOMPRESS_FILTER_TYPE_STR))
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNewVar(filterParam));
|
||||
else
|
||||
THROW_FMT(AssertError, "unable to add filter '%s'", strPtr(filterKey));
|
||||
}
|
||||
|
||||
FUNCTION_TEST_RETURN_VOID();
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Process storage protocol requests
|
||||
***********************************************************************************************************************************/
|
||||
@ -87,7 +121,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
|
||||
// Create the read object
|
||||
IoRead *fileRead = storageReadIo(
|
||||
interface.newRead(
|
||||
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1))));
|
||||
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varBool(varLstGet(paramList, 1)), false));
|
||||
|
||||
// Set filter group based on passed filters
|
||||
storageRemoteFilterGroup(ioReadFilterGroup(fileRead), varLstGet(paramList, 2));
|
||||
|
||||
// Check if the file exists
|
||||
bool exists = ioReadOpen(fileRead);
|
||||
@ -127,7 +164,10 @@ storageRemoteProtocol(const String *command, const VariantList *paramList, Proto
|
||||
driver, storagePathNP(storage, varStr(varLstGet(paramList, 0))), varUIntForce(varLstGet(paramList, 1)),
|
||||
varUIntForce(varLstGet(paramList, 2)), varStr(varLstGet(paramList, 3)), varStr(varLstGet(paramList, 4)),
|
||||
(time_t)varIntForce(varLstGet(paramList, 5)), varBool(varLstGet(paramList, 6)),
|
||||
varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9))));
|
||||
varBool(varLstGet(paramList, 7)), varBool(varLstGet(paramList, 8)), varBool(varLstGet(paramList, 9)), false));
|
||||
|
||||
// Set filter group based on passed filters
|
||||
storageRemoteFilterGroup(ioWriteFilterGroup(fileWrite), varLstGet(paramList, 10));
|
||||
|
||||
// Open file
|
||||
ioWriteOpen(fileWrite);
|
||||
|
@ -6,6 +6,8 @@ Remote Storage Read
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "common/compress/gzip/compress.h"
|
||||
#include "common/compress/gzip/decompress.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/io/read.intern.h"
|
||||
#include "common/log.h"
|
||||
@ -24,10 +26,15 @@ typedef struct StorageReadRemote
|
||||
MemContext *memContext; // Object mem context
|
||||
StorageReadInterface interface; // Interface
|
||||
StorageRemote *storage; // Storage that created this object
|
||||
StorageRead *read; // Storage read interface
|
||||
|
||||
ProtocolClient *client; // Protocol client for requests
|
||||
size_t remaining; // Bytes remaining to be read in block
|
||||
bool eof; // Has the file reached eof?
|
||||
|
||||
#ifdef DEBUG
|
||||
uint64_t protocolReadBytes; // How many bytes were read from the protocol layer?
|
||||
#endif
|
||||
} StorageReadRemote;
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -56,9 +63,25 @@ storageReadRemoteOpen(THIS_VOID)
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
|
||||
// If the file is compressible add compression filter on the remote
|
||||
if (this->interface.compressible)
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew((int)this->interface.compressLevel, true));
|
||||
|
||||
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR);
|
||||
protocolCommandParamAdd(command, VARSTR(this->interface.name));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.ignoreMissing));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
|
||||
|
||||
// If the file is compressible add decompression filter locally
|
||||
if (this->interface.compressible)
|
||||
{
|
||||
// Since we can't insert filters yet we'll just error if there are already filters in the list
|
||||
CHECK(ioFilterGroupSize(ioReadFilterGroup(storageReadIo(this->read))) == 0);
|
||||
|
||||
ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), gzipDecompressNew(true));
|
||||
}
|
||||
|
||||
result = varBool(protocolClientExecute(this->client, command, true));
|
||||
}
|
||||
@ -100,6 +123,10 @@ storageReadRemote(THIS_VOID, Buffer *buffer, bool block)
|
||||
|
||||
if (this->remaining == 0)
|
||||
this->eof = true;
|
||||
|
||||
#ifdef DEBUG
|
||||
this->protocolReadBytes += this->remaining;
|
||||
#endif
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
}
|
||||
@ -147,30 +174,36 @@ storageReadRemoteEof(THIS_VOID)
|
||||
New object
|
||||
***********************************************************************************************************************************/
|
||||
StorageRead *
|
||||
storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing)
|
||||
storageReadRemoteNew(
|
||||
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
|
||||
unsigned int compressLevel)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
|
||||
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
|
||||
FUNCTION_LOG_PARAM(STRING, name);
|
||||
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
|
||||
FUNCTION_LOG_PARAM(BOOL, compressible);
|
||||
FUNCTION_LOG_PARAM(UINT, compressLevel);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(storage != NULL);
|
||||
ASSERT(client != NULL);
|
||||
ASSERT(name != NULL);
|
||||
|
||||
StorageRead *this = NULL;
|
||||
StorageReadRemote *this = NULL;
|
||||
|
||||
MEM_CONTEXT_NEW_BEGIN("StorageReadRemote")
|
||||
{
|
||||
StorageReadRemote *driver = memNew(sizeof(StorageReadRemote));
|
||||
driver->memContext = MEM_CONTEXT_NEW();
|
||||
this = memNew(sizeof(StorageReadRemote));
|
||||
this->memContext = MEM_CONTEXT_NEW();
|
||||
|
||||
driver->interface = (StorageReadInterface)
|
||||
this->interface = (StorageReadInterface)
|
||||
{
|
||||
.type = STORAGE_REMOTE_TYPE_STR,
|
||||
.name = strDup(name),
|
||||
.compressible = compressible,
|
||||
.compressLevel = compressLevel,
|
||||
.ignoreMissing = ignoreMissing,
|
||||
|
||||
.ioInterface = (IoReadInterface)
|
||||
@ -181,12 +214,13 @@ storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const Strin
|
||||
},
|
||||
};
|
||||
|
||||
driver->storage = storage;
|
||||
driver->client = client;
|
||||
this->storage = storage;
|
||||
this->client = client;
|
||||
|
||||
this = storageReadNew(driver, &driver->interface);
|
||||
this->read = storageReadNew(this, &this->interface);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(STORAGE_READ, this);
|
||||
ASSERT(this != NULL);
|
||||
FUNCTION_LOG_RETURN(STORAGE_READ, this->read);
|
||||
}
|
||||
|
@ -11,6 +11,8 @@ Remote Storage Read
|
||||
/***********************************************************************************************************************************
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
StorageRead *storageReadRemoteNew(StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing);
|
||||
StorageRead *storageReadRemoteNew(
|
||||
StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
|
||||
unsigned int compressLevel);
|
||||
|
||||
#endif
|
||||
|
@ -24,6 +24,7 @@ struct StorageRemote
|
||||
{
|
||||
MemContext *memContext;
|
||||
ProtocolClient *client; // Protocol client
|
||||
unsigned int compressLevel; // Protocol compression level
|
||||
};
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -113,7 +114,7 @@ storageRemoteList(THIS_VOID, const String *path, const String *expression)
|
||||
New file read object
|
||||
***********************************************************************************************************************************/
|
||||
static StorageRead *
|
||||
storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
|
||||
{
|
||||
THIS(StorageRemote);
|
||||
|
||||
@ -121,11 +122,15 @@ storageRemoteNewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
FUNCTION_LOG_PARAM(STORAGE_REMOTE, this);
|
||||
FUNCTION_LOG_PARAM(STRING, file);
|
||||
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
|
||||
FUNCTION_LOG_PARAM(BOOL, compressible);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
||||
FUNCTION_LOG_RETURN(STORAGE_READ, storageReadRemoteNew(this, this->client, file, ignoreMissing));
|
||||
FUNCTION_LOG_RETURN(
|
||||
STORAGE_READ,
|
||||
storageReadRemoteNew(
|
||||
this, this->client, file, ignoreMissing, this->compressLevel > 0 ? compressible : false, this->compressLevel));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -134,7 +139,7 @@ New file write object
|
||||
static StorageWrite *
|
||||
storageRemoteNewWrite(
|
||||
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic)
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
|
||||
{
|
||||
THIS(StorageRemote);
|
||||
|
||||
@ -150,6 +155,7 @@ storageRemoteNewWrite(
|
||||
FUNCTION_LOG_PARAM(BOOL, syncFile);
|
||||
FUNCTION_LOG_PARAM(BOOL, syncPath);
|
||||
FUNCTION_LOG_PARAM(BOOL, atomic);
|
||||
FUNCTION_LOG_PARAM(BOOL, compressible);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
@ -158,7 +164,8 @@ storageRemoteNewWrite(
|
||||
FUNCTION_LOG_RETURN(
|
||||
STORAGE_WRITE,
|
||||
storageWriteRemoteNew(
|
||||
this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic));
|
||||
this, this->client, file, modeFile, modePath, user, group, timeModified, createPath, syncFile, syncPath, atomic,
|
||||
this->compressLevel > 0 ? compressible : false, this->compressLevel));
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -319,7 +326,8 @@ New object
|
||||
***********************************************************************************************************************************/
|
||||
Storage *
|
||||
storageRemoteNew(
|
||||
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client)
|
||||
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client,
|
||||
unsigned int compressLevel)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(MODE, modeFile);
|
||||
@ -327,6 +335,7 @@ storageRemoteNew(
|
||||
FUNCTION_LOG_PARAM(BOOL, write);
|
||||
FUNCTION_LOG_PARAM(FUNCTIONP, pathExpressionFunction);
|
||||
FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
|
||||
FUNCTION_LOG_PARAM(UINT, compressLevel);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(modeFile != 0);
|
||||
@ -340,6 +349,7 @@ storageRemoteNew(
|
||||
StorageRemote *driver = memNew(sizeof(StorageRemote));
|
||||
driver->memContext = MEM_CONTEXT_NEW();
|
||||
driver->client = client;
|
||||
driver->compressLevel = compressLevel;
|
||||
|
||||
uint64_t feature = 0;
|
||||
|
||||
|
@ -17,6 +17,7 @@ Storage type
|
||||
Constructor
|
||||
***********************************************************************************************************************************/
|
||||
Storage *storageRemoteNew(
|
||||
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client);
|
||||
mode_t modeFile, mode_t modePath, bool write, StoragePathExpressionCallback pathExpressionFunction, ProtocolClient *client,
|
||||
unsigned int compressLevel);
|
||||
|
||||
#endif
|
||||
|
@ -3,6 +3,8 @@ Remote Storage File write
|
||||
***********************************************************************************************************************************/
|
||||
#include "build.auto.h"
|
||||
|
||||
#include "common/compress/gzip/compress.h"
|
||||
#include "common/compress/gzip/decompress.h"
|
||||
#include "common/debug.h"
|
||||
#include "common/io/write.intern.h"
|
||||
#include "common/log.h"
|
||||
@ -23,7 +25,12 @@ typedef struct StorageWriteRemote
|
||||
MemContext *memContext; // Object mem context
|
||||
StorageWriteInterface interface; // Interface
|
||||
StorageRemote *storage; // Storage that created this object
|
||||
StorageWrite *write; // Storage write interface
|
||||
ProtocolClient *client; // Protocol client to make requests with
|
||||
|
||||
#ifdef DEBUG
|
||||
uint64_t protocolWriteBytes; // How many bytes were written to the protocol layer?
|
||||
#endif
|
||||
} StorageWriteRemote;
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -61,6 +68,12 @@ storageWriteRemoteOpen(THIS_VOID)
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
|
||||
// If the file is compressible add decompression filter on the remote
|
||||
if (this->interface.compressible)
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(true));
|
||||
|
||||
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR);
|
||||
protocolCommandParamAdd(command, VARSTR(this->interface.name));
|
||||
protocolCommandParamAdd(command, VARUINT(this->interface.modeFile));
|
||||
@ -72,6 +85,14 @@ storageWriteRemoteOpen(THIS_VOID)
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.syncFile));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.syncPath));
|
||||
protocolCommandParamAdd(command, VARBOOL(this->interface.atomic));
|
||||
protocolCommandParamAdd(command, ioFilterGroupParamAll(filterGroup));
|
||||
|
||||
// If the file is compressible add compression filter locally
|
||||
if (this->interface.compressible)
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
ioWriteFilterGroup(storageWriteIo(this->write)), gzipCompressNew((int)this->interface.compressLevel, true));
|
||||
}
|
||||
|
||||
protocolClientExecute(this->client, command, false);
|
||||
|
||||
@ -103,6 +124,10 @@ storageWriteRemote(THIS_VOID, const Buffer *buffer)
|
||||
ioWrite(protocolClientIoWrite(this->client), buffer);
|
||||
ioWriteFlush(protocolClientIoWrite(this->client));
|
||||
|
||||
#ifdef DEBUG
|
||||
this->protocolWriteBytes += bufUsed(buffer);
|
||||
#endif
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
||||
@ -140,7 +165,8 @@ Create a new file
|
||||
StorageWrite *
|
||||
storageWriteRemoteNew(
|
||||
StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user,
|
||||
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic)
|
||||
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible,
|
||||
unsigned int compressLevel)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelTrace);
|
||||
FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
|
||||
@ -154,6 +180,8 @@ storageWriteRemoteNew(
|
||||
FUNCTION_LOG_PARAM(BOOL, syncFile);
|
||||
FUNCTION_LOG_PARAM(BOOL, syncPath);
|
||||
FUNCTION_LOG_PARAM(BOOL, atomic);
|
||||
FUNCTION_LOG_PARAM(BOOL, compressible);
|
||||
FUNCTION_LOG_PARAM(UINT, compressLevel);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(storage != NULL);
|
||||
@ -162,18 +190,20 @@ storageWriteRemoteNew(
|
||||
ASSERT(modeFile != 0);
|
||||
ASSERT(modePath != 0);
|
||||
|
||||
StorageWrite *this = NULL;
|
||||
StorageWriteRemote *this = NULL;
|
||||
|
||||
MEM_CONTEXT_NEW_BEGIN("StorageWriteRemote")
|
||||
{
|
||||
StorageWriteRemote *driver = memNew(sizeof(StorageWriteRemote));
|
||||
driver->memContext = MEM_CONTEXT_NEW();
|
||||
this = memNew(sizeof(StorageWriteRemote));
|
||||
this->memContext = MEM_CONTEXT_NEW();
|
||||
|
||||
driver->interface = (StorageWriteInterface)
|
||||
this->interface = (StorageWriteInterface)
|
||||
{
|
||||
.type = STORAGE_REMOTE_TYPE_STR,
|
||||
.name = strDup(name),
|
||||
.atomic = atomic,
|
||||
.compressible = compressible,
|
||||
.compressLevel = compressLevel,
|
||||
.createPath = createPath,
|
||||
.group = strDup(group),
|
||||
.modeFile = modeFile,
|
||||
@ -191,12 +221,13 @@ storageWriteRemoteNew(
|
||||
},
|
||||
};
|
||||
|
||||
driver->storage = storage;
|
||||
driver->client = client;
|
||||
this->storage = storage;
|
||||
this->client = client;
|
||||
|
||||
this = storageWriteNew(driver, &driver->interface);
|
||||
this->write = storageWriteNew(this, &this->interface);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(STORAGE_WRITE, this);
|
||||
ASSERT(this != NULL);
|
||||
FUNCTION_LOG_RETURN(STORAGE_WRITE, this->write);
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ Constructor
|
||||
***********************************************************************************************************************************/
|
||||
StorageWrite *storageWriteRemoteNew(
|
||||
StorageRemote *storage, ProtocolClient *client, const String *name, mode_t modeFile, mode_t modePath, const String *user,
|
||||
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic);
|
||||
const String *group, time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible,
|
||||
unsigned int compressLevel);
|
||||
|
||||
#endif
|
||||
|
@ -657,7 +657,7 @@ storageS3List(THIS_VOID, const String *path, const String *expression)
|
||||
New file read object
|
||||
***********************************************************************************************************************************/
|
||||
static StorageRead *
|
||||
storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing, bool compressible)
|
||||
{
|
||||
THIS(StorageS3);
|
||||
|
||||
@ -665,6 +665,7 @@ storageS3NewRead(THIS_VOID, const String *file, bool ignoreMissing)
|
||||
FUNCTION_LOG_PARAM(STORAGE_S3, this);
|
||||
FUNCTION_LOG_PARAM(STRING, file);
|
||||
FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
|
||||
(void)compressible;
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
@ -679,7 +680,7 @@ New file write object
|
||||
static StorageWrite *
|
||||
storageS3NewWrite(
|
||||
THIS_VOID, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group, time_t timeModified,
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic)
|
||||
bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible)
|
||||
{
|
||||
THIS(StorageS3);
|
||||
|
||||
@ -695,6 +696,7 @@ storageS3NewWrite(
|
||||
FUNCTION_LOG_PARAM(BOOL, syncFile);
|
||||
FUNCTION_LOG_PARAM(BOOL, syncPath);
|
||||
FUNCTION_LOG_PARAM(BOOL, atomic);
|
||||
(void)compressible;
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
|
@ -407,7 +407,7 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
|
||||
FUNCTION_LOG_PARAM(STORAGE, this);
|
||||
FUNCTION_LOG_PARAM(STRING, fileExp);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.ignoreMissing);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.compressible);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
@ -416,12 +416,9 @@ storageNewRead(const Storage *this, const String *fileExp, StorageNewReadParam p
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
result = this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing);
|
||||
|
||||
if (param.filterGroup != NULL)
|
||||
ioReadFilterGroupSet(storageReadIo(result), param.filterGroup);
|
||||
|
||||
storageReadMove(result, MEM_CONTEXT_OLD());
|
||||
result = storageReadMove(
|
||||
this->interface.newRead(this->driver, storagePathNP(this, fileExp), param.ignoreMissing, param.compressible),
|
||||
MEM_CONTEXT_OLD());
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
@ -446,7 +443,7 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam
|
||||
FUNCTION_LOG_PARAM(BOOL, param.noSyncFile);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.noSyncPath);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.noAtomic);
|
||||
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, param.filterGroup);
|
||||
FUNCTION_LOG_PARAM(BOOL, param.compressible);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(this != NULL);
|
||||
@ -456,15 +453,12 @@ storageNewWrite(const Storage *this, const String *fileExp, StorageNewWriteParam
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
result = this->interface.newWrite(
|
||||
result = storageWriteMove(
|
||||
this->interface.newWrite(
|
||||
this->driver, storagePathNP(this, fileExp), param.modeFile != 0 ? param.modeFile : this->modeFile,
|
||||
param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified, !param.noCreatePath,
|
||||
!param.noSyncFile, !param.noSyncPath, !param.noAtomic);
|
||||
|
||||
if (param.filterGroup != NULL)
|
||||
ioWriteFilterGroupSet(storageWriteIo(result), param.filterGroup);
|
||||
|
||||
storageWriteMove(result, MEM_CONTEXT_OLD());
|
||||
param.modePath != 0 ? param.modePath : this->modePath, param.user, param.group, param.timeModified,
|
||||
!param.noCreatePath, !param.noSyncFile, !param.noSyncPath, !param.noAtomic, param.compressible),
|
||||
MEM_CONTEXT_OLD());
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
|
@ -146,7 +146,7 @@ storageNewRead
|
||||
typedef struct StorageNewReadParam
|
||||
{
|
||||
bool ignoreMissing;
|
||||
IoFilterGroup *filterGroup;
|
||||
bool compressible;
|
||||
} StorageNewReadParam;
|
||||
|
||||
#define storageNewReadP(this, pathExp, ...) \
|
||||
@ -170,7 +170,7 @@ typedef struct StorageNewWriteParam
|
||||
bool noSyncFile;
|
||||
bool noSyncPath;
|
||||
bool noAtomic;
|
||||
IoFilterGroup *filterGroup;
|
||||
bool compressible;
|
||||
} StorageNewWriteParam;
|
||||
|
||||
#define storageNewWriteP(this, pathExp, ...) \
|
||||
|
@ -57,15 +57,16 @@ typedef struct StorageInterface
|
||||
// Features implemented by the storage driver
|
||||
uint64_t feature;
|
||||
|
||||
bool (*copy)(StorageRead *source, StorageWrite *destination);
|
||||
bool (*exists)(void *driver, const String *file);
|
||||
StorageInfo (*info)(void *driver, const String *path, bool followLink);
|
||||
bool (*infoList)(void *driver, const String *file, StorageInfoListCallback callback, void *callbackData);
|
||||
StringList *(*list)(void *driver, const String *path, const String *expression);
|
||||
bool (*move)(void *driver, StorageRead *source, StorageWrite *destination);
|
||||
StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing);
|
||||
StorageRead *(*newRead)(void *driver, const String *file, bool ignoreMissing, bool compressible);
|
||||
StorageWrite *(*newWrite)(
|
||||
void *driver, const String *file, mode_t modeFile, mode_t modePath, const String *user, const String *group,
|
||||
time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic);
|
||||
time_t timeModified, bool createPath, bool syncFile, bool syncPath, bool atomic, bool compressible);
|
||||
void (*pathCreate)(void *driver, const String *path, bool errorOnExists, bool noParentCreate, mode_t mode);
|
||||
bool (*pathExists)(void *driver, const String *path);
|
||||
bool (*pathRemove)(void *driver, const String *path, bool recurse);
|
||||
|
@ -23,6 +23,8 @@ typedef struct StorageWriteInterface
|
||||
|
||||
bool atomic;
|
||||
bool createPath;
|
||||
bool compressible; // Is this file compressible?
|
||||
unsigned int compressLevel; // Level to use for compression
|
||||
const String *group; // Group that owns the file
|
||||
mode_t modeFile;
|
||||
mode_t modePath;
|
||||
|
@ -534,6 +534,8 @@ unit:
|
||||
storage/remote/storage: full
|
||||
storage/remote/write: full
|
||||
storage/helper: full
|
||||
storage/read: full
|
||||
storage/write: full
|
||||
storage/storage: full
|
||||
|
||||
# ----------------------------------------------------------------------------------------------------------------------------
|
||||
|
@ -186,10 +186,9 @@ testRun(void)
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test1/archive.info"));
|
||||
|
||||
ioWriteFilterGroupSet(
|
||||
storageWriteIo(infoWrite),
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL)));
|
||||
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
|
||||
BUFSTRDEF("12345678"), NULL));
|
||||
|
||||
storagePutNP(
|
||||
infoWrite,
|
||||
@ -208,11 +207,10 @@ testRun(void)
|
||||
strNew(
|
||||
"repo/archive/test1/10-1/01ABCDEF01ABCDEF/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.gz"));
|
||||
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(destination));
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
|
||||
ioFilterGroupAdd(
|
||||
filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("worstpassphraseever"), NULL));
|
||||
ioWriteFilterGroupSet(storageWriteIo(destination), filterGroup);
|
||||
storagePutNP(destination, buffer);
|
||||
|
||||
TEST_RESULT_INT(
|
||||
|
@ -373,10 +373,9 @@ testRun(void)
|
||||
|
||||
StorageWrite *infoWrite = storageNewWriteNP(storageTest, strNew("repo/archive/test/archive.info"));
|
||||
|
||||
ioWriteFilterGroupSet(
|
||||
storageWriteIo(infoWrite),
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpassphrase"), NULL)));
|
||||
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
|
||||
BUFSTRDEF("badpassphrase"), NULL));
|
||||
|
||||
storagePutNP(
|
||||
infoWrite,
|
||||
|
@ -139,9 +139,8 @@ testRun(void)
|
||||
bufUsedSet(buffer, bufSize(buffer));
|
||||
memset(bufPtr(buffer), 0, bufSize(buffer));
|
||||
|
||||
IoWrite *write = ioWriteFilterGroupSet(
|
||||
ioBufferWriteNew(bufferOut),
|
||||
ioFilterGroupAdd(ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0)));
|
||||
IoWrite *write = ioBufferWriteNew(bufferOut);
|
||||
ioFilterGroupAdd(ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
ioWriteClose(write);
|
||||
@ -161,10 +160,9 @@ testRun(void)
|
||||
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.walid = 0xF0F0F0F0;
|
||||
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x00)))->pd_lsn.xrecoff = 0xF0F0F0F0;
|
||||
|
||||
write = ioWriteFilterGroupSet(
|
||||
ioBufferWriteNew(bufferOut),
|
||||
write = ioBufferWriteNew(bufferOut);
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
|
||||
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
ioWriteClose(write);
|
||||
@ -209,10 +207,9 @@ testRun(void)
|
||||
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_upper = 0x01;
|
||||
((PageHeaderData *)(bufPtr(buffer) + (PG_PAGE_SIZE_DEFAULT * 0x07)))->pd_lsn.xrecoff = 0x7;
|
||||
|
||||
write = ioWriteFilterGroupSet(
|
||||
ioBufferWriteNew(bufferOut),
|
||||
write = ioBufferWriteNew(bufferOut);
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
|
||||
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
ioWriteClose(write);
|
||||
@ -227,10 +224,9 @@ testRun(void)
|
||||
bufUsedSet(buffer, bufSize(buffer));
|
||||
memset(bufPtr(buffer), 0, bufSize(buffer));
|
||||
|
||||
write = ioWriteFilterGroupSet(
|
||||
ioBufferWriteNew(bufferOut),
|
||||
write = ioBufferWriteNew(bufferOut);
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
|
||||
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
ioWriteClose(write);
|
||||
@ -245,10 +241,9 @@ testRun(void)
|
||||
bufUsedSet(buffer, bufSize(buffer));
|
||||
memset(bufPtr(buffer), 0, bufSize(buffer));
|
||||
|
||||
write = ioWriteFilterGroupSet(
|
||||
ioBufferWriteNew(bufferOut),
|
||||
write = ioBufferWriteNew(bufferOut);
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000)));
|
||||
ioWriteFilterGroup(write), pageChecksumNew(0, PG_SEGMENT_PAGE_DEFAULT, PG_PAGE_SIZE_DEFAULT, 0xFACEFACE00000000));
|
||||
ioWriteOpen(write);
|
||||
ioWrite(write, buffer);
|
||||
TEST_ERROR(ioWrite(write, buffer), AssertError, "should not be possible to see two misaligned pages in a row");
|
||||
|
@ -65,10 +65,9 @@ testRun(void)
|
||||
// Create a compressed encrypted repo file
|
||||
StorageWrite *ceRepoFile = storageNewWriteNP(
|
||||
storageRepoWrite(), strNewFmt(STORAGE_REPO_BACKUP "/%s/%s.gz", strPtr(repoFileReferenceFull), strPtr(repoFile1)));
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
IoFilterGroup *filterGroup = ioWriteFilterGroup(storageWriteIo(ceRepoFile));
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("badpass"), NULL));
|
||||
ioWriteFilterGroupSet(storageWriteIo(ceRepoFile), filterGroup);
|
||||
|
||||
storagePutNP(ceRepoFile, BUFSTRDEF("acefile"));
|
||||
|
||||
|
@ -16,10 +16,8 @@ testCompress(IoFilter *compress, Buffer *decompressed, size_t inputSize, size_t
|
||||
size_t inputTotal = 0;
|
||||
ioBufferSizeSet(outputSize);
|
||||
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, compress);
|
||||
IoWrite *write = ioBufferWriteNew(compressed);
|
||||
ioWriteFilterGroupSet(write, filterGroup);
|
||||
ioFilterGroupAdd(ioWriteFilterGroup(write), compress);
|
||||
ioWriteOpen(write);
|
||||
|
||||
// Compress input data
|
||||
@ -52,10 +50,8 @@ testDecompress(IoFilter *decompress, Buffer *compressed, size_t inputSize, size_
|
||||
Buffer *output = bufNew(outputSize);
|
||||
ioBufferSizeSet(inputSize);
|
||||
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, decompress);
|
||||
IoRead *read = ioBufferReadNew(compressed);
|
||||
ioReadFilterGroupSet(read, filterGroup);
|
||||
ioFilterGroupAdd(ioReadFilterGroup(read), decompress);
|
||||
ioReadOpen(read);
|
||||
|
||||
while (!ioReadEof(read))
|
||||
@ -110,8 +106,15 @@ testRun(void)
|
||||
Buffer *compressed = NULL;
|
||||
Buffer *decompressed = bufNewC(simpleData, strlen(simpleData));
|
||||
|
||||
VariantList *compressParamList = varLstNew();
|
||||
varLstAdd(compressParamList, varNewUInt(3));
|
||||
varLstAdd(compressParamList, varNewBool(false));
|
||||
|
||||
VariantList *decompressParamList = varLstNew();
|
||||
varLstAdd(decompressParamList, varNewBool(false));
|
||||
|
||||
TEST_ASSIGN(
|
||||
compressed, testCompress(gzipCompressNew(3, false), decompressed, 1024, 1024),
|
||||
compressed, testCompress(gzipCompressNewVar(compressParamList), decompressed, 1024, 1024),
|
||||
"simple data - compress large in/large out buffer");
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
@ -127,7 +130,7 @@ testRun(void)
|
||||
"simple data - compress small in/small out buffer");
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
bufEq(decompressed, testDecompress(gzipDecompressNew(false), compressed, 1024, 1024)), true,
|
||||
bufEq(decompressed, testDecompress(gzipDecompressNewVar(decompressParamList), compressed, 1024, 1024)), true,
|
||||
"simple data - decompress large in/large out buffer");
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
|
@ -3,6 +3,8 @@ Test IO
|
||||
***********************************************************************************************************************************/
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "common/type/json.h"
|
||||
|
||||
#include "common/harnessFork.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
@ -111,7 +113,7 @@ ioTestFilterSizeNew(const char *type)
|
||||
IoTestFilterSize *driver = memNew(sizeof(IoTestFilterSize));
|
||||
driver->memContext = MEM_CONTEXT_NEW();
|
||||
|
||||
this = ioFilterNewP(strNew(type), driver, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult);
|
||||
this = ioFilterNewP(strNew(type), driver, NULL, .in = ioTestFilterSizeProcess, .result = ioTestFilterSizeResult);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
|
||||
@ -216,8 +218,13 @@ ioTestFilterMultiplyNew(const char *type, unsigned int multiplier, unsigned int
|
||||
driver->flushTotal = flushTotal;
|
||||
driver->flushChar = flushChar;
|
||||
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStrZ(type));
|
||||
varLstAdd(paramList, varNewUInt(multiplier));
|
||||
varLstAdd(paramList, varNewUInt(flushTotal));
|
||||
|
||||
this = ioFilterNewP(
|
||||
strNew(type), driver, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess,
|
||||
strNew(type), driver, paramList, .done = ioTestFilterMultiplyDone, .inOut = ioTestFilterMultiplyProcess,
|
||||
.inputSame = ioTestFilterMultiplyInputSame);
|
||||
}
|
||||
MEM_CONTEXT_NEW_END();
|
||||
@ -285,18 +292,19 @@ testRun(void)
|
||||
bufferOriginal = bufNewC("123", 3);
|
||||
|
||||
TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object");
|
||||
IoFilterGroup *filterGroup = NULL;
|
||||
TEST_RESULT_VOID(ioFilterGroupMove(filterGroup, memContextTop()), "move null filter group is a noop");
|
||||
TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group");
|
||||
IoFilter *sizeFilter = ioSizeNew();
|
||||
TEST_RESULT_PTR(ioFilterGroupAdd(filterGroup, sizeFilter), filterGroup, " add filter to filter group");
|
||||
TEST_RESULT_PTR(
|
||||
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), sizeFilter), bufferRead->filterGroup, " add filter to filter group");
|
||||
TEST_RESULT_VOID(
|
||||
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("double", 2, 3, 'X')), " add filter to filter group");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioSizeNew()), " add filter to filter group");
|
||||
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 3, 'X')),
|
||||
" add filter to filter group");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioSizeNew()), " add filter to filter group");
|
||||
IoFilter *bufferFilter = ioBufferNew();
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, bufferFilter), " add filter to filter group");
|
||||
TEST_RESULT_VOID(ioReadFilterGroupSet(bufferRead, filterGroup), " add filter group to read io");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(ioReadFilterGroup(bufferRead), bufferFilter), " add filter to filter group");
|
||||
TEST_RESULT_PTR(ioFilterMove(NULL, memContextTop()), NULL, " move NULL filter to top context");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(bufferRead)), 0)),
|
||||
"{\"buffer\":null,\"double\":[\"double\",2,3],\"size\":null}", " check filter params");
|
||||
|
||||
TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open");
|
||||
TEST_RESULT_INT(ioReadHandle(bufferRead), -1, " handle invalid");
|
||||
@ -322,21 +330,26 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(ioBufferRead(ioReadDriver(bufferRead), buffer, true), 0, " eof from driver");
|
||||
TEST_RESULT_SIZE(ioRead(bufferRead, buffer), 0, " read 0 bytes");
|
||||
TEST_RESULT_VOID(ioReadClose(bufferRead), " close buffer read object");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(jsonFromVar(ioFilterGroupResultAll(ioReadFilterGroup(bufferRead)), 0)),
|
||||
"{\"buffer\":null,\"double\":null,\"size\":[3,9]}",
|
||||
" check filter result all");
|
||||
|
||||
TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), filterGroup, " check filter group");
|
||||
TEST_RESULT_PTR(ioReadFilterGroup(bufferRead), ioReadFilterGroup(bufferRead), " check filter group");
|
||||
TEST_RESULT_UINT(
|
||||
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 0)), 3,
|
||||
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 0)), 3,
|
||||
" check filter result");
|
||||
TEST_RESULT_PTR(ioFilterGroupResult(filterGroup, strNew("double")), NULL, " check filter result is NULL");
|
||||
TEST_RESULT_PTR(
|
||||
ioFilterGroupResult(ioReadFilterGroup(bufferRead), strNew("double")), NULL, " check filter result is NULL");
|
||||
TEST_RESULT_UINT(
|
||||
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(filterGroup, ioFilterType(sizeFilter))), 1)), 9,
|
||||
varUInt64(varLstGet(varVarLst(ioFilterGroupResult(ioReadFilterGroup(bufferRead), ioFilterType(sizeFilter))), 1)), 9,
|
||||
" check filter result");
|
||||
|
||||
TEST_RESULT_PTR(ioFilterDriver(bufferFilter), bufferFilter->driver, " check filter driver");
|
||||
TEST_RESULT_PTR(ioFilterInterface(bufferFilter), &bufferFilter->interface, " check filter interface");
|
||||
|
||||
TEST_RESULT_VOID(ioFilterFree(bufferFilter), " free buffer filter");
|
||||
TEST_RESULT_VOID(ioFilterGroupFree(filterGroup), " free filter group object");
|
||||
TEST_RESULT_VOID(ioFilterGroupFree(ioReadFilterGroup(bufferRead)), " free filter group object");
|
||||
|
||||
// Read a zero-size buffer to ensure filters are still processed even when there is no input. Some filters (e.g. encryption
|
||||
// and compression) will produce output even if there is no input.
|
||||
@ -347,7 +360,7 @@ testRun(void)
|
||||
|
||||
TEST_ASSIGN(bufferRead, ioBufferReadNew(bufferOriginal), "create buffer read object");
|
||||
TEST_RESULT_VOID(
|
||||
ioReadFilterGroupSet(bufferRead, ioFilterGroupAdd(ioFilterGroupNew(), ioTestFilterMultiplyNew("double", 2, 5, 'Y'))),
|
||||
ioFilterGroupAdd(ioReadFilterGroup(bufferRead), ioTestFilterMultiplyNew("double", 2, 5, 'Y')),
|
||||
" add filter that produces output with no input");
|
||||
TEST_RESULT_BOOL(ioReadOpen(bufferRead), true, " open read");
|
||||
TEST_RESULT_UINT(ioRead(bufferRead, buffer), 5, " read 5 chars");
|
||||
@ -437,8 +450,7 @@ testRun(void)
|
||||
Buffer *buffer = bufNew(0);
|
||||
|
||||
TEST_ASSIGN(bufferWrite, ioBufferWriteNew(buffer), "create buffer write object");
|
||||
IoFilterGroup *filterGroup = NULL;
|
||||
TEST_ASSIGN(filterGroup, ioFilterGroupNew(), " create new filter group");
|
||||
IoFilterGroup *filterGroup = ioWriteFilterGroup(bufferWrite);
|
||||
IoFilter *sizeFilter = ioSizeNew();
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, sizeFilter), " add filter to filter group");
|
||||
TEST_RESULT_VOID(
|
||||
@ -447,7 +459,6 @@ testRun(void)
|
||||
ioFilterGroupAdd(filterGroup, ioTestFilterMultiplyNew("single", 1, 1, 'Y')),
|
||||
" add filter to filter group");
|
||||
TEST_RESULT_VOID(ioFilterGroupAdd(filterGroup, ioTestFilterSizeNew("size2")), " add filter to filter group");
|
||||
TEST_RESULT_VOID(ioWriteFilterGroupSet(bufferWrite, filterGroup), " add filter group to write io");
|
||||
|
||||
TEST_RESULT_VOID(ioWriteOpen(bufferWrite), " open buffer write object");
|
||||
TEST_RESULT_INT(ioWriteHandle(bufferWrite), -1, " handle invalid");
|
||||
|
@ -62,10 +62,9 @@ testRun(void)
|
||||
//--------------------------------------------------------------------------------------------------------------------------
|
||||
StorageWrite *infoWrite = storageNewWriteNP(storageLocalWrite(), fileName);
|
||||
|
||||
ioWriteFilterGroupSet(
|
||||
storageWriteIo(infoWrite),
|
||||
ioFilterGroupAdd(
|
||||
ioFilterGroupNew(), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("12345678"), NULL)));
|
||||
ioWriteFilterGroup(storageWriteIo(infoWrite)), cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc,
|
||||
BUFSTRDEF("12345678"), NULL));
|
||||
|
||||
storageRemoveNP(storageLocalWrite(), fileNameCopy);
|
||||
storagePutNP(
|
||||
|
@ -604,11 +604,6 @@ testRun(void)
|
||||
TEST_RESULT_INT(
|
||||
ioReadHandle(storageReadIo(file)), ((StorageReadPosix *)file->driver)->handle, "check read handle");
|
||||
TEST_RESULT_VOID(ioReadClose(storageReadIo(file)), " close file");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
TEST_ASSIGN(file, storageNewReadP(storageTest, fileName, .filterGroup = filterGroup), "new read file with filters");
|
||||
TEST_RESULT_PTR(ioReadFilterGroup(storageReadIo(file)), filterGroup, " check filter group is set");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
@ -670,13 +665,6 @@ testRun(void)
|
||||
TEST_RESULT_VOID(storageWritePosixClose(storageWriteDriver(file)), " close file again");
|
||||
TEST_RESULT_INT(storageInfoNP(storageTest, strPath(fileName)).mode, 0700, " check path mode");
|
||||
TEST_RESULT_INT(storageInfoNP(storageTest, fileName).mode, 0600, " check file mode");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
TEST_ASSIGN(file, storageNewWriteP(storageTest, fileName, .filterGroup = filterGroup), "new write file with filters");
|
||||
TEST_RESULT_VOID(ioWriteOpen(storageWriteIo(file)), " open file");
|
||||
TEST_RESULT_VOID(ioWriteClose(storageWriteIo(file)), " close file");
|
||||
TEST_RESULT_PTR(ioWriteFilterGroup(storageWriteIo(file)), filterGroup, " check filter group is set");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
|
@ -1,6 +1,7 @@
|
||||
/***********************************************************************************************************************************
|
||||
Test Remote Storage
|
||||
***********************************************************************************************************************************/
|
||||
#include "common/crypto/cipherBlock.h"
|
||||
#include "common/io/bufferRead.h"
|
||||
#include "common/io/bufferWrite.h"
|
||||
|
||||
@ -139,6 +140,9 @@ testRun(void)
|
||||
|
||||
storagePutNP(storageNewWriteNP(storageTest, strNew("repo/test.txt")), contentBuf);
|
||||
|
||||
// Disable protocol compression in the storage object to test no compression
|
||||
((StorageRemote *)storageRemote->driver)->compressLevel = 0;
|
||||
|
||||
StorageRead *fileRead = NULL;
|
||||
|
||||
ioBufferSizeSet(8193);
|
||||
@ -150,8 +154,20 @@ testRun(void)
|
||||
storageReadRemote(storageRead(fileRead), bufNew(32), false), 0,
|
||||
"nothing more to read");
|
||||
|
||||
TEST_ASSIGN(fileRead, storageNewReadNP(storageRemote, strNew("test.txt")), "get file");
|
||||
TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents");
|
||||
TEST_RESULT_UINT(((StorageReadRemote *)fileRead->driver)->protocolReadBytes, bufSize(contentBuf), " check read size");
|
||||
|
||||
// Enable protocol compression in the storage object
|
||||
((StorageRemote *)storageRemote->driver)->compressLevel = 3;
|
||||
|
||||
TEST_ASSIGN(
|
||||
fileRead, storageNewReadP(storageRemote, strNew("test.txt"), .compressible = true), "get file (protocol compress)");
|
||||
TEST_RESULT_BOOL(bufEq(storageGetNP(fileRead), contentBuf), true, " check contents");
|
||||
// We don't know how much protocol compression there will be exactly, but make sure this is some
|
||||
TEST_RESULT_BOOL(
|
||||
bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "get file again");
|
||||
((StorageReadRemote *)fileRead->driver)->protocolReadBytes < bufSize(contentBuf), true,
|
||||
" check compressed read size");
|
||||
|
||||
TEST_ERROR(
|
||||
storageRemoteProtocolBlockSize(strNew("bogus")), ProtocolError, "'bogus' is not a valid block size message");
|
||||
@ -161,6 +177,7 @@ testRun(void)
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(strNew("missing.txt")));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true,
|
||||
@ -178,6 +195,12 @@ testRun(void)
|
||||
varLstAdd(paramList, varNewStr(strNew("test.txt")));
|
||||
varLstAdd(paramList, varNewBool(false));
|
||||
|
||||
// Create filters to test filter logic
|
||||
IoFilterGroup *filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, gzipCompressNew(3, false));
|
||||
ioFilterGroupAdd(filterGroup, gzipDecompressNew(false));
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), true, "protocol open read");
|
||||
TEST_RESULT_STR(
|
||||
@ -190,6 +213,21 @@ testRun(void)
|
||||
|
||||
bufUsedSet(serverWrite, 0);
|
||||
ioBufferSizeSet(8192);
|
||||
|
||||
// Check for error on a bogus filter
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStr(strNew("test.txt")));
|
||||
varLstAdd(paramList, varNewBool(false));
|
||||
|
||||
// Create filters to test filter logic
|
||||
filterGroup = ioFilterGroupNew();
|
||||
ioFilterGroupAdd(filterGroup, cipherBlockNew(cipherModeEncrypt, cipherTypeAes256Cbc, BUFSTRDEF("X"), NULL));
|
||||
varLstAdd(paramList, ioFilterGroupParamAll(filterGroup));
|
||||
|
||||
TEST_ERROR(
|
||||
storageRemoteProtocol(
|
||||
PROTOCOL_COMMAND_STORAGE_OPEN_READ_STR, paramList, server), AssertError, "unable to add filter 'cipherBlock'");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
@ -213,6 +251,9 @@ testRun(void)
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
ioBufferSizeSet(9999);
|
||||
|
||||
// Disable protocol compression in the storage object to test no compression
|
||||
((StorageRemote *)storageRemote->driver)->compressLevel = 0;
|
||||
|
||||
StorageWrite *write = NULL;
|
||||
TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test.txt")), "new write file");
|
||||
|
||||
@ -225,6 +266,7 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(storageWriteSyncPath(write), true, "path is synced");
|
||||
|
||||
TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file");
|
||||
TEST_RESULT_UINT(((StorageWriteRemote *)write->driver)->protocolWriteBytes, bufSize(contentBuf), " check write size");
|
||||
TEST_RESULT_VOID(storageWriteRemoteClose((StorageWriteRemote *)storageWriteDriver(write)), "close file again");
|
||||
TEST_RESULT_VOID(storageWriteFree(write), "free file");
|
||||
|
||||
@ -232,6 +274,9 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(
|
||||
bufEq(storageGetNP(storageNewReadNP(storageRemote, strNew("test.txt"))), contentBuf), true, "check file");
|
||||
|
||||
// Enable protocol compression in the storage object
|
||||
((StorageRemote *)storageRemote->driver)->compressLevel = 3;
|
||||
|
||||
// Write the file again, but this time free it before close and make sure the .tmp file is left
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_ASSIGN(write, storageNewWriteNP(storageRemote, strNew("test2.txt")), "new write file");
|
||||
@ -244,6 +289,14 @@ testRun(void)
|
||||
TEST_RESULT_UINT(
|
||||
storageInfoNP(storageTest, strNew("repo/test2.txt.pgbackrest.tmp")).size, 16384, "file exists and is partial");
|
||||
|
||||
// Write the file again with protocol compression
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_ASSIGN(write, storageNewWriteP(storageRemote, strNew("test2.txt"), .compressible = true), "new write file (compress)");
|
||||
TEST_RESULT_VOID(storagePutNP(write, contentBuf), "write file");
|
||||
TEST_RESULT_BOOL(
|
||||
((StorageWriteRemote *)write->driver)->protocolWriteBytes < bufSize(contentBuf), true,
|
||||
" check compressed write size");
|
||||
|
||||
// Check protocol function directly (complete write)
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
ioBufferSizeSet(10);
|
||||
@ -259,6 +312,7 @@ testRun(void)
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
|
||||
// Generate input (includes the input for the test below -- need a way to reset this for better testing)
|
||||
bufCat(
|
||||
@ -299,6 +353,7 @@ testRun(void)
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewBool(true));
|
||||
varLstAdd(paramList, varNewKv(kvNew()));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageRemoteProtocol(PROTOCOL_COMMAND_STORAGE_OPEN_WRITE_STR, paramList, server), true, "protocol open write");
|
||||
|
Loading…
Reference in New Issue
Block a user