1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-07 13:40:17 +02:00

Decompress on agent's size

This commit is contained in:
Konstantin Knizhnik 2020-02-06 14:39:52 +03:00 committed by Grigory Smolkin
parent f165001c8d
commit 7d78d501a5
4 changed files with 95 additions and 39 deletions

View File

@ -89,7 +89,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
* Decompresses source into dest using algorithm. Returns the number of bytes
* decompressed in the destination buffer, or -1 if decompression fails.
*/
static int32
int32
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, const char **errormsg)
{
@ -1047,9 +1047,9 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
{
off_t write_pos;
size_t read_len;
DataPage compressed_page; /* used as read buffer */
DataPage page;
int32 uncompressed_size = 0;
int32 compressed_size = 0;
bool is_compressed = false;
/* check for interrupt */
if (interrupted || thread_interrupted)
@ -1090,14 +1090,16 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
* n_blocks attribute was available only in DELTA backups.
* File truncate in PAGE and PTRACK happened on the fly when
* special value PageIsTruncated is encountered.
* It is inefficient.
* It was inefficient.
*
* Nowadays every backup type has n_blocks, so instead
* writing and then truncating redundant data, writing
* is not happening in the first place.
* TODO: remove in 3.0.0
*/
if (header.compressed_size == PageIsTruncated)
compressed_size = header.compressed_size;
if (compressed_size == PageIsTruncated)
{
/*
* Block header contains information that this block was truncated.
@ -1124,16 +1126,15 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
if (nblocks > 0 && blknum >= nblocks)
break;
if (header.compressed_size > BLCKSZ)
if (compressed_size > BLCKSZ)
elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum);
/* read a page from file */
read_len = fread(compressed_page.data, 1,
MAXALIGN(header.compressed_size), in);
read_len = fread(page.data, 1, MAXALIGN(compressed_size), in);
if (read_len != MAXALIGN(header.compressed_size))
if (read_len != MAXALIGN(compressed_size))
elog(ERROR, "Cannot read block %u of \"%s\", read %zu of %d",
blknum, from_fullpath, read_len, header.compressed_size);
blknum, from_fullpath, read_len, compressed_size);
/*
* if page size is smaller than BLCKSZ, decompress the page.
@ -1142,23 +1143,10 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
* page_may_be_compressed() function.
*/
if (header.compressed_size != BLCKSZ
|| page_may_be_compressed(compressed_page.data, file->compress_alg,
|| page_may_be_compressed(page.data, file->compress_alg,
backup_version))
{
const char *errormsg = NULL;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size,
file->compress_alg, &errormsg);
if (uncompressed_size < 0 && errormsg != NULL)
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, from_fullpath, errormsg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
from_fullpath, uncompressed_size);
is_compressed = true;
}
write_pos = blknum * BLCKSZ;
@ -1170,19 +1158,21 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
/* if we uncompressed the page - write page.data,
* if page wasn't compressed -
* write what we've read - compressed_page.data
/* If page is compressed and restore is in remote mode, send compressed
* page to the remote side.
*/
if (uncompressed_size == BLCKSZ)
if (is_compressed)
{
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
ssize_t rc;
rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg);
if (!fio_is_remote_file(out) && rc != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u",
blknum, to_fullpath, strerror(errno), compressed_size);
}
else
{
if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ)
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}

View File

@ -216,8 +216,8 @@ typedef enum ShowFormat
#define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */
#define FILE_NOT_FOUND (-2) /* file disappeared during backup */
#define BLOCKNUM_INVALID (-1)
#define PROGRAM_VERSION "2.2.7"
#define AGENT_PROTOCOL_VERSION 20207
#define PROGRAM_VERSION "2.2.8"
#define AGENT_PROTOCOL_VERSION 20208
typedef struct ConnectionOptions
@ -898,8 +898,10 @@ extern long unsigned int base36dec(const char *text);
extern uint32 parse_server_version(const char *server_version_str);
extern uint32 parse_program_version(const char *program_version);
extern bool parse_page(Page page, XLogRecPtr *lsn);
int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level, const char **errormsg);
extern int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level, const char **errormsg);
extern int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, const char **errormsg);
extern void pretty_size(int64 size, char *buf, size_t len);
extern void pretty_time_interval(int64 num_seconds, char *buf, size_t len);

View File

@ -55,7 +55,7 @@ void fio_error(int rc, int size, char const* file, int line)
{
if (remote_agent)
{
fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d: %s\n", file, line, rc, size, rc >= 0 ? "end of data" : strerror(errno));
fprintf(stderr, "%s:%d: processed %d bytes instead of %d: %s\n", file, line, rc, size, rc >= 0 ? "end of data" : strerror(errno));
exit(EXIT_FAILURE);
}
else
@ -611,6 +611,65 @@ ssize_t fio_write(int fd, void const* buf, size_t size)
}
}
static int32
fio_decompress(void* dst, void const* src, size_t size, int compress_alg)
{
const char *errormsg = NULL;
int32 uncompressed_size = do_decompress(dst, BLCKSZ,
src,
size,
compress_alg, &errormsg);
if (uncompressed_size < 0 && errormsg != NULL)
{
elog(WARNING, "An error occured during decompressing block: %s", errormsg);
return -1;
}
if (uncompressed_size != BLCKSZ)
{
elog(ERROR, "Page uncompressed to %d bytes != BLCKSZ",
uncompressed_size);
return -1;
}
return uncompressed_size;
}
/* Write data to the file */
ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg)
{
if (fio_is_remote_file(f))
{
fio_header hdr;
hdr.cop = FIO_WRITE_COMPRESSED;
hdr.handle = fio_fileno(f) & ~FIO_PIPE_MARKER;
hdr.size = size;
hdr.arg = compress_alg;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, buf, size), size);
return size;
}
else
{
char uncompressed_buf[BLCKSZ];
int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
return (uncompressed_size < 0)
? uncompressed_size
: fwrite(uncompressed_buf, 1, uncompressed_size, f);
}
}
static ssize_t
fio_write_compressed_impl(int fd, void const* buf, size_t size, int compress_alg)
{
char uncompressed_buf[BLCKSZ];
int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
return fio_write_all(fd, uncompressed_buf, uncompressed_size);
}
/* Read data from stdio file */
ssize_t fio_fread(FILE* f, void* buf, size_t size)
{
@ -1447,6 +1506,9 @@ void fio_communicate(int in, int out)
case FIO_WRITE: /* Write to the current position in file */
IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size);
break;
case FIO_WRITE_COMPRESSED: /* Write to the current position in file */
IO_CHECK(fio_write_compressed_impl(fd[hdr.handle], buf, hdr.size, hdr.arg), BLCKSZ);
break;
case FIO_READ: /* Read from the current position in file */
if ((size_t)hdr.arg > buf_size) {
buf_size = hdr.arg;

View File

@ -34,7 +34,8 @@ typedef enum
FIO_READDIR,
FIO_CLOSEDIR,
FIO_SEND_PAGES,
FIO_PAGE
FIO_PAGE,
FIO_WRITE_COMPRESSED,
} fio_operations;
typedef enum
@ -70,6 +71,7 @@ extern void fio_communicate(int in, int out);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg);
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_pread(FILE* f, void* buf, off_t offs);
extern int fio_fprintf(FILE* f, char const* arg, ...) pg_attribute_printf(2, 3);