1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-10 14:37:03 +02:00

Implement backup data compression. Add options --compress-algorithm, --compress-level

This commit is contained in:
Anastasia 2017-06-05 12:09:07 +03:00
parent 94d4986356
commit 310882b539
6 changed files with 238 additions and 18 deletions

View File

@ -10,6 +10,7 @@
#include "pg_probackup.h"
static void opt_log_level(pgut_option *opt, const char *arg);
static void opt_compress_alg(pgut_option *opt, const char *arg);
static pgBackupConfig *cur_config = NULL;
@ -47,6 +48,11 @@ do_configure(bool show_only)
if (retention_window)
config->retention_window = retention_window;
if (compress_alg != NOT_DEFINED_COMPRESS)
config->compress_alg = compress_alg;
if (compress_level != -1)
config->compress_level = compress_level;
if (show_only)
writeBackupCatalogConfig(stderr, config);
else
@ -74,6 +80,9 @@ pgBackupConfigInit(pgBackupConfig *config)
config->retention_redundancy = 0;
config->retention_window = 0;
config->compress_alg = NOT_DEFINED_COMPRESS;
config->compress_level = -1;
}
void
@ -113,6 +122,15 @@ writeBackupCatalogConfig(FILE *out, pgBackupConfig *config)
if (config->retention_window)
fprintf(out, "retention-window = %u\n", config->retention_window);
fprintf(out, "#Compression parameters:\n");
fprintf(out, "compress-algorithm = %s\n", deparse_compress_alg(config->compress_alg));
/* if none value is set, print default */
if (config->compress_level == -1)
fprintf(out, "compress-level = %u\n", DEFAULT_COMPRESS_LEVEL);
else
fprintf(out, "compress-level = %u\n", config->compress_level);
}
void
@ -143,6 +161,9 @@ readBackupCatalogConfigFile(void)
/* retention options */
{ 'u', 0, "retention-redundancy", &(config->retention_redundancy),SOURCE_FILE_STRICT },
{ 'u', 0, "retention-window", &(config->retention_window), SOURCE_FILE_STRICT },
/* compression options */
{ 'f', 36, "compress-algorithm", opt_compress_alg, SOURCE_CMDLINE },
{ 'u', 37, "compress-level", &(config->compress_level), SOURCE_CMDLINE },
/* logging options */
{ 'f', 40, "log-level", opt_log_level, SOURCE_CMDLINE },
{ 's', 41, "log-filename", &(config->log_filename), SOURCE_CMDLINE },
@ -177,3 +198,9 @@ opt_log_level(pgut_option *opt, const char *arg)
{
cur_config->log_level = parse_log_level(arg);
}
static void
opt_compress_alg(pgut_option *opt, const char *arg)
{
cur_config->compress_alg = parse_compress_alg(arg);
}

123
data.c
View File

@ -19,10 +19,63 @@
#include "storage/block.h"
#include "storage/bufpage.h"
#include "storage/checksum_impl.h"
#include <common/pg_lzcompress.h>
#include <zlib.h>
static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
{
uLongf compressed_size = dst_size;
int rc = compress2(dst, &compressed_size, src, src_size, compress_level);
return rc == Z_OK ? compressed_size : rc;
}
static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
{
uLongf dest_len = dst_size;
int rc = uncompress(dst, &dest_len, src, src_size);
return rc == Z_OK ? dest_len : rc;
}
static size_t
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
{
switch (alg)
{
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return -1;
case ZLIB_COMPRESS:
return zlib_compress(dst, dst_size, src, src_size);
case PGLZ_COMPRESS:
return pglz_compress(src, src_size, dst, PGLZ_strategy_always);
}
return -1;
}
static size_t
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
{
switch (alg)
{
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return -1;
case ZLIB_COMPRESS:
return zlib_decompress(dst, dst_size, src, src_size);
case PGLZ_COMPRESS:
return pglz_decompress(src, src_size, dst, dst_size);
}
return -1;
}
typedef struct BackupPageHeader
{
BlockNumber block; /* block number */
int32 compressed_size;
} BackupPageHeader;
/* Verify page's header */
@ -61,8 +114,10 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
BackupPageHeader header;
off_t offset;
DataPage page; /* used as read buffer */
size_t write_buffer_size = sizeof(header) + BLCKSZ;
char write_buffer[write_buffer_size];
DataPage compressed_page; /* used as read buffer */
size_t write_buffer_size;
/* maximum size of write buffer */
char write_buffer[BLCKSZ+sizeof(header)];
size_t read_len = 0;
XLogRecPtr page_lsn;
int try_checksum = 100;
@ -162,9 +217,31 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
file->read_size += read_len;
memcpy(write_buffer, &header, sizeof(header));
/* TODO implement block compression here? */
memcpy(write_buffer + sizeof(header), page.data, BLCKSZ);
header.compressed_size = do_compress(compressed_page.data, sizeof(compressed_page.data),
page.data, sizeof(page.data), compress_alg);
file->compress_alg = compress_alg;
Assert (header.compressed_size <= BLCKSZ);
write_buffer_size = sizeof(header);
if (header.compressed_size > 0)
{
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header), compressed_page.data, header.compressed_size);
write_buffer_size += MAXALIGN(header.compressed_size);
}
else
{
header.compressed_size = BLCKSZ;
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header), page.data, BLCKSZ);
write_buffer_size += header.compressed_size;
}
/* Update CRC */
COMP_CRC32C(*crc, &write_buffer, write_buffer_size);
/* write data page */
if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size)
{
@ -175,10 +252,6 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
file->path, blknum, strerror(errno_tmp));
}
/* update CRC */
COMP_CRC32C(*crc, &header, sizeof(header));
COMP_CRC32C(*crc, page.data, BLCKSZ);
file->write_size += write_buffer_size;
}
@ -468,7 +541,8 @@ restore_data_file(const char *from_root,
for (blknum = 0; ; blknum++)
{
size_t read_len;
DataPage page; /* used as read buffer */
DataPage compressed_page; /* used as read buffer */
DataPage page;
/* read BackupPageHeader */
read_len = fread(&header, 1, sizeof(header), in);
@ -482,18 +556,35 @@ restore_data_file(const char *from_root,
"odd size page found at block %u of \"%s\"",
blknum, file->path);
else
elog(ERROR, "cannot read block %u of \"%s\": %s",
elog(ERROR, "cannot read header of block %u of \"%s\": %s",
blknum, file->path, strerror(errno_tmp));
}
if (header.block < blknum)
elog(ERROR, "backup is broken at block %u",
blknum);
elog(ERROR, "backup is broken at block %u", blknum);
/* TODO fix this assert */
Assert (header.compressed_size <= BLCKSZ);
if (fread(page.data, 1, BLCKSZ, in) != BLCKSZ)
elog(ERROR, "cannot read block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
read_len = fread(compressed_page.data, 1,
MAXALIGN(header.compressed_size), in);
if (read_len != MAXALIGN(header.compressed_size))
elog(ERROR, "cannot read block %u of \"%s\" read %lu of %d",
blknum, file->path, read_len, header.compressed_size);
if (header.compressed_size < BLCKSZ)
{
size_t uncompressed_size = 0;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size, file->compress_alg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size);
}
else
memcpy(page.data, compressed_page.data, BLCKSZ);
/* update checksum because we are not save whole */
if(backup->checksum_version)

8
dir.c
View File

@ -156,6 +156,7 @@ pgFileInit(const char *path)
strcpy(file->path, path); /* enough buffer size guaranteed */
file->generation = -1;
file->is_partial_copy = 0;
file->compress_alg = NOT_DEFINED_COMPRESS;
return file;
}
@ -671,9 +672,9 @@ print_file_list(FILE *out, const parray *files, const char *root)
path = GetRelativePath(path, root);
fprintf(out, "{\"path\":\"%s\", \"size\":\"%lu\",\"mode\":\"%u\","
"\"is_datafile\":\"%u\", \"crc\":\"%u\"",
"\"is_datafile\":\"%u\", \"crc\":\"%u\", \"compress_alg\":\"%s\"",
path, (unsigned long) file->write_size, file->mode,
file->is_datafile?1:0, file->crc);
file->is_datafile?1:0, file->crc, deparse_compress_alg(file->compress_alg));
if (file->is_datafile)
fprintf(out, ",\"segno\":\"%d\"", file->segno);
@ -847,6 +848,7 @@ dir_read_file_list(const char *root, const char *file_txt)
char path[MAXPGPATH];
char filepath[MAXPGPATH];
char linked[MAXPGPATH];
char compress_alg_string[MAXPGPATH];
uint64 write_size,
mode, /* bit length of mode_t depends on platforms */
is_datafile,
@ -867,6 +869,7 @@ dir_read_file_list(const char *root, const char *file_txt)
/* optional fields */
get_control_value(buf, "linked", linked, NULL, false);
get_control_value(buf, "segno", NULL, &segno, false);
get_control_value(buf, "compress_alg", compress_alg_string, NULL, false);
#ifdef PGPRO_EE
get_control_value(buf, "CFS_generation", NULL, &generation, true);
@ -883,6 +886,7 @@ dir_read_file_list(const char *root, const char *file_txt)
file->mode = (mode_t) mode;
file->is_datafile = is_datafile ? true : false;
file->crc = (pg_crc32) crc;
file->compress_alg = parse_compress_alg(compress_alg_string);
if (linked[0])
file->linked = pgut_strdup(linked);
file->segno = (int) segno;

18
help.c
View File

@ -75,12 +75,16 @@ help_pg_probackup(void)
printf(_(" [--log-rotation-age=log-rotation-age]\n"));
printf(_(" [--retention-redundancy=retention-redundancy]\n"));
printf(_(" [--retention-window=retention-window]\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_("\n %s show-config -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n %s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n"));
printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_(" [--progress] [--delete-expired]\n"));
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
@ -132,6 +136,8 @@ help_backup(void)
printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n"));
printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n"));
printf(_(" [--progress] [--delete-expired]\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
@ -148,6 +154,11 @@ help_backup(void)
printf(_(" --delete-expired delete backups expired according to current\n"));
printf(_(" retention policy after successful backup completion\n"));
printf(_("\n Compression options:\n"));
printf(_(" --compress-algorithm=compress-algorithm\n"));
printf(_(" available options: 'zlib','pglz','none'\n"));
printf(_(" --compress-level=compress-level level of compression [0-9]\n"));
printf(_("\n Connection options:\n"));
printf(_(" -d, --dbname=DBNAME database to connect\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
@ -236,6 +247,8 @@ help_set_config(void)
printf(_(" [--log-rotation-age=log-rotation-age]\n"));
printf(_(" [--retention-redundancy=retention-redundancy]\n"));
printf(_(" [--retention-window=retention-window]\n\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance\n"));
@ -264,6 +277,11 @@ help_set_config(void)
printf(_(" --retention-window=retention-window\n"));
printf(_(" number of days of recoverability\n"));
printf(_("\n Compression options:\n"));
printf(_(" --compress-algorithm=compress-algorithm\n"));
printf(_(" available options: 'zlib','pglz','none'\n"));
printf(_(" --compress-level=compress-level\n"));
printf(_(" level of compression [0-9]\n"));
}
static void

View File

@ -63,6 +63,9 @@ uint64 system_identifier = 0;
uint32 retention_redundancy = 0;
uint32 retention_window = 0;
CompressAlg compress_alg = NOT_DEFINED_COMPRESS;
int compress_level = -1;
/* restore configuration */
static char *target_time;
static char *target_xid;
@ -75,6 +78,7 @@ static char *wal_file_name;
static void opt_backup_mode(pgut_option *opt, const char *arg);
static void opt_log_level(pgut_option *opt, const char *arg);
static void opt_compress_alg(pgut_option *opt, const char *arg);
static pgut_option options[] =
{
@ -109,6 +113,9 @@ static pgut_option options[] =
/* retention options */
{ 'u', 34, "retention-redundancy", &retention_redundancy, SOURCE_CMDLINE },
{ 'u', 35, "retention-window", &retention_window, SOURCE_CMDLINE },
/* compression options */
{ 'f', 36, "compress-algorithm", opt_compress_alg, SOURCE_CMDLINE },
{ 'u', 37, "compress-level", &compress_level, SOURCE_CMDLINE },
/* logging options */
{ 'f', 40, "log-level", opt_log_level, SOURCE_CMDLINE },
{ 's', 41, "log-filename", &log_filename, SOURCE_CMDLINE },
@ -330,6 +337,10 @@ main(int argc, char *argv[])
if (num_threads < 1)
num_threads = 1;
if (compress_level != -1 &&
(compress_level < 0 || compress_level > 9))
elog(ERROR, "--compress-level value must be in the range from 0 to 9");
/* do actual operation */
switch (backup_subcmd)
{
@ -389,3 +400,51 @@ opt_log_level(pgut_option *opt, const char *arg)
log_level = parse_log_level(arg);
log_level_defined = true;
}
CompressAlg
parse_compress_alg(const char *arg)
{
size_t len;
/* Skip all spaces detected */
while (isspace((unsigned char)*arg))
arg++;
len = strlen(arg);
if (len == 0)
elog(ERROR, "compress algrorithm is empty");
if (pg_strncasecmp("zlib", arg, len) == 0)
return ZLIB_COMPRESS;
else if (pg_strncasecmp("pglz", arg, len) == 0)
return PGLZ_COMPRESS;
else if (pg_strncasecmp("none", arg, len) == 0)
return NONE_COMPRESS;
else
elog(ERROR, "invalid compress algorithm value \"%s\"", arg);
return NOT_DEFINED_COMPRESS;
}
const char*
deparse_compress_alg(int alg)
{
switch (alg)
{
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return "none";
case ZLIB_COMPRESS:
return "zlib";
case PGLZ_COMPRESS:
return "pglz";
}
return NULL;
}
void
opt_compress_alg(pgut_option *opt, const char *arg)
{
compress_alg = parse_compress_alg(arg);
}

View File

@ -56,6 +56,14 @@
#define XID_FMT "%u"
#endif
typedef enum CompressAlg
{
NOT_DEFINED_COMPRESS = 0,
NONE_COMPRESS,
PGLZ_COMPRESS,
ZLIB_COMPRESS,
} CompressAlg;
/* Information about single file (or dir) in backup */
typedef struct pgFile
{
@ -77,6 +85,7 @@ typedef struct pgFile
* we cannot backup compressed file partially. */
int is_partial_copy; /* for compressed files. Set to '1' if backed up
* via copy_file_partly() */
CompressAlg compress_alg; /* compression algorithm applied to the file */
volatile uint32 lock; /* lock for synchronization of parallel threads */
datapagemap_t pagemap; /* bitmap of pages updated since previous backup */
} pgFile;
@ -118,6 +127,7 @@ typedef enum ProbackupSubcmd
SHOW_CONFIG
} ProbackupSubcmd;
/* special values of pgBackup fields */
#define INVALID_BACKUP_ID 0
#define BYTES_INVALID (-1)
@ -140,6 +150,9 @@ typedef struct pgBackupConfig
uint32 retention_redundancy;
uint32 retention_window;
CompressAlg compress_alg;
int compress_level;
} pgBackupConfig;
/* Information about single backup stored in backup.conf */
@ -261,6 +274,14 @@ extern uint64 system_identifier;
extern uint32 retention_redundancy;
extern uint32 retention_window;
extern CompressAlg compress_alg;
extern int compress_level;
#define DEFAULT_COMPRESS_LEVEL 6
extern CompressAlg parse_compress_alg(const char *arg);
extern const char* deparse_compress_alg(int alg);
/* in dir.c */
/* exclude directory list for $PGDATA file listing */
extern const char *pgdata_exclude_dir[];