1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-05 16:07:35 +02:00

Decompress restored file by remote agent

This commit is contained in:
Konstantin Knizhnik 2019-07-03 23:22:48 +03:00
parent da8bf6de29
commit 3873318eec
4 changed files with 79 additions and 49 deletions

View File

@ -89,7 +89,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
* Decompresses source into dest using algorithm. Returns the number of bytes
* decompressed in the destination buffer, or -1 if decompression fails.
*/
static int32
int32
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, const char **errormsg)
{
@ -719,6 +719,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
BlockNumber blknum = 0,
truncate_from = 0;
bool need_truncate = false;
size_t rc;
/* BYTES_INVALID allowed only in case of restoring file from DELTA backup */
if (file->write_size != BYTES_INVALID)
@ -750,9 +751,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
{
off_t write_pos;
size_t read_len;
DataPage compressed_page; /* used as read buffer */
DataPage page;
int32 uncompressed_size = 0;
int32 compressed_size;
const char *errormsg = NULL;
/* File didn`t changed. Nothig to copy */
if (file->write_size == BYTES_INVALID)
@ -789,7 +790,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
blknum, file->path, strerror(errno_tmp));
}
if (header.block == 0 && header.compressed_size == 0)
compressed_size = header.compressed_size;
if (header.block == 0 && compressed_size)
{
elog(VERBOSE, "Skip empty block of \"%s\"", file->path);
continue;
@ -801,7 +804,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
blknum = header.block;
if (header.compressed_size == PageIsTruncated)
if (compressed_size == PageIsTruncated)
{
/*
* Backup contains information that this block was truncated.
@ -812,39 +815,14 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
break;
}
Assert(header.compressed_size <= BLCKSZ);
Assert(compressed_size <= BLCKSZ);
/* read a page from file */
read_len = fread(compressed_page.data, 1,
MAXALIGN(header.compressed_size), in);
if (read_len != MAXALIGN(header.compressed_size))
read_len = fread(page.data, 1,
MAXALIGN(compressed_size), in);
if (read_len != MAXALIGN(compressed_size))
elog(ERROR, "Cannot read block %u of \"%s\" read %zu of %d",
blknum, file->path, read_len, header.compressed_size);
/*
* if page size is smaller than BLCKSZ, decompress the page.
* BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ.
* we have to check, whether it is compressed or not using
* page_may_be_compressed() function.
*/
if (header.compressed_size != BLCKSZ
|| page_may_be_compressed(compressed_page.data, file->compress_alg,
backup_version))
{
const char *errormsg = NULL;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size,
file->compress_alg, &errormsg);
if (uncompressed_size < 0 && errormsg != NULL)
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, file->path, errormsg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
file->path, uncompressed_size);
}
blknum, file->path, read_len, compressed_size);
write_pos = (write_header) ? blknum * (BLCKSZ + sizeof(header)) :
blknum * BLCKSZ;
@ -865,19 +843,22 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
blknum, file->path, strerror(errno));
}
/* if we uncompressed the page - write page.data,
* if page wasn't compressed -
* write what we've read - compressed_page.data
/*
* if page size is smaller than BLCKSZ, decompress the page.
* BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ.
* we have to check, whether it is compressed or not using
* page_may_be_compressed() function.
*/
if (uncompressed_size == BLCKSZ)
rc = (compressed_size != BLCKSZ || page_may_be_compressed(page.data, file->compress_alg, backup_version))
? fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, &errormsg)
: fio_fwrite(out, page.data, compressed_size);
if (rc != compressed_size)
{
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}
else
{
if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ)
if (errormsg != NULL)
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, file->path, errormsg);
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}

View File

@ -700,6 +700,8 @@ int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo);
extern void check_system_identifiers(PGconn *conn, char *pgdata);
extern void parse_filelist_filenames(parray *files, const char *root);
extern int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, const char **errormsg);
#endif /* PG_PROBACKUP_H */

View File

@ -566,6 +566,35 @@ size_t fio_fwrite(FILE* f, void const* buf, size_t size)
: fwrite(buf, 1, size, f);
}
/* Write data to stdio file */
size_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char** errmsg)
{
uint32 decompressed_size;
char decompressed_page[BLCKSZ];
if (fio_is_remote_file(f))
{
fio_header hdr;
hdr.cop = FIO_WRITE_COMPRESSED;
hdr.handle = fio_fileno(f) & ~FIO_PIPE_MARKER;
hdr.size = size;
hdr.arg = compress_alg;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, buf, size), size);
return size;
}
decompressed_size = do_decompress(decompressed_page,
BLCKSZ,
buf,
size,
(CompressAlg)compress_alg, errmsg);
return decompressed_size != BLCKSZ
? 0 : fwrite(decompressed_page, 1, decompressed_size, f);
}
/* Write data to the file */
ssize_t fio_write(int fd, void const* buf, size_t size)
{
@ -1381,6 +1410,22 @@ void fio_communicate(int in, int out)
case FIO_WRITE: /* Write to the current position in file */
IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size);
break;
case FIO_WRITE_COMPRESSED: /* Write to the current position in file */
{
char decompressed_page[BLCKSZ];
char const* errmsg = NULL;
int32 decompressed_size = do_decompress(decompressed_page, BLCKSZ,
buf,
hdr.size,
(CompressAlg)hdr.arg, &errmsg);
if (errmsg != NULL || decompressed_size != BLCKSZ)
{
fprintf(stderr, "Failed to decompress block: %s", errmsg ? errmsg: "unknown error");
exit(EXIT_FAILURE);
}
IO_CHECK(fio_write_all(fd[hdr.handle], decompressed_page, BLCKSZ), BLCKSZ);
}
break;
case FIO_READ: /* Read from the current position in file */
if ((size_t)hdr.arg > buf_size) {
buf_size = hdr.arg;

View File

@ -33,7 +33,8 @@ typedef enum
FIO_READDIR,
FIO_CLOSEDIR,
FIO_SEND_PAGES,
FIO_PAGE
FIO_PAGE,
FIO_WRITE_COMPRESSED,
} fio_operations;
typedef enum
@ -69,6 +70,7 @@ extern void fio_communicate(int in, int out);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern size_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char** errmsg);
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_pread(FILE* f, void* buf, off_t offs);
extern int fio_fprintf(FILE* f, char const* arg, ...) pg_attribute_printf(2, 3);