1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-03 14:01:57 +02:00

Support compression

This commit is contained in:
Konstantin Knizhnik 2018-11-05 23:18:44 +03:00
parent 9a4780700c
commit 6b029ebfdc
5 changed files with 113 additions and 40 deletions

View File

@ -989,7 +989,7 @@ copy_meta(const char *from_path, const char *to_path, bool unlink_on_error, fio_
if (stat(from_path, &st) == -1)
{
if (unlink_on_error)
unlink(to_path);
fio_unlink(to_path, location);
elog(ERROR, "Cannot stat file \"%s\": %s",
from_path, strerror(errno));
}
@ -997,7 +997,7 @@ copy_meta(const char *from_path, const char *to_path, bool unlink_on_error, fio_
if (fio_chmod(to_path, st.st_mode, location) == -1)
{
if (unlink_on_error)
unlink(to_path);
fio_unlink(to_path, location);
elog(ERROR, "Cannot change mode of file \"%s\": %s",
to_path, strerror(errno));
}
@ -1020,6 +1020,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
#ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL;
int gz_tmp = -1;
#endif
/* open file for read */
@ -1039,7 +1040,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
gz_out = gzopen(to_path_temp, PG_BINARY_W);
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, &gz_tmp, FIO_BACKUP_HOST);
if (gzsetparams(gz_out, compress_level, Z_DEFAULT_STRATEGY) != Z_OK)
elog(ERROR, "Cannot set compression level %d to file \"%s\": %s",
compress_level, to_path_temp, get_gz_error(gz_out, errno));
@ -1070,7 +1071,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
if (ferror(in))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR,
"Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
@ -1084,7 +1085,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
if (gzwrite(gz_out, buf, read_len) != read_len)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
@ -1095,7 +1096,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
@ -1109,10 +1110,10 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
#ifdef HAVE_LIBZ
if (is_compress)
{
if (gzclose(gz_out) != 0)
if (fio_gzclose(gz_out, to_path_temp, gz_tmp) != 0)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
@ -1124,7 +1125,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
fio_fclose(out))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
@ -1133,7 +1134,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
if (fclose(in))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
@ -1144,7 +1145,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path_p, strerror(errno_temp));
}
@ -1229,7 +1230,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (read_len != sizeof(buf) && !gzeof(gz_in))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
@ -1241,7 +1242,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (ferror(in))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
@ -1252,7 +1253,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp,
strerror(errno_temp));
}
@ -1277,7 +1278,7 @@ get_wal_file(const char *from_path, const char *to_path)
fio_fclose(out))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
@ -1288,7 +1289,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (gzclose(gz_in) != 0)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
@ -1299,7 +1300,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (fclose(in))
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
@ -1311,7 +1312,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0)
{
errno_temp = errno;
unlink(to_path_temp);
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path, strerror(errno_temp));
}

View File

@ -102,6 +102,7 @@ typedef struct XLogPageReadPrivate
#ifdef HAVE_LIBZ
gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH];
int gz_tmp;
#endif
} XLogPageReadPrivate;
@ -885,8 +886,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
private_data->thread_num, private_data->gz_xlogpath);
private_data->xlogexists = true;
private_data->gz_xlogfile = gzopen(private_data->gz_xlogpath,
"rb");
private_data->gz_xlogfile = fio_gzopen(private_data->gz_xlogpath,
"rb", &private_data->gz_tmp, private_data->location);
if (private_data->gz_xlogfile == NULL)
{
elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s",
@ -1000,7 +1001,7 @@ CleanupXLogPageRead(XLogReaderState *xlogreader)
#ifdef HAVE_LIBZ
else if (private_data->gz_xlogfile != NULL)
{
gzclose(private_data->gz_xlogfile);
fio_gzclose(private_data->gz_xlogfile, private_data->gz_xlogpath, private_data->gz_tmp);
private_data->gz_xlogfile = NULL;
}
#endif

View File

@ -36,7 +36,8 @@ static bool fio_is_remote_fd(int fd)
static bool fio_is_remote(fio_location location)
{
return (location == FIO_BACKUP_HOST && is_remote_agent)
return location == FIO_REMOTE_HOST
|| (location == FIO_BACKUP_HOST && is_remote_agent)
|| (location == FIO_DB_HOST && !is_remote_agent && ssh_host != NULL);
}
@ -581,6 +582,61 @@ int fio_chmod(char const* path, int mode, fio_location location)
}
}
#ifdef HAVE_LIBZ
gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location)
{
gzFile file;
if (fio_is_remote(location))
{
int fd = mkstemp("gz.XXXXXX");
if (fd < 0)
return NULL;
*tmp_fd = fd;
file = gzdopen(fd, mode);
}
else
{
*tmp_fd = -1;
file = gzopen(path, mode);
}
return file;
}
int fio_gzclose(gzFile file, char const* path, int tmp_fd)
{
if (tmp_fd >= 0)
{
off_t size;
void* buf;
int fd;
SYS_CHECK(gzflush(file, Z_FINISH));
size = lseek(tmp_fd, 0, SEEK_END);
buf = malloc(size);
lseek(tmp_fd, 0, SEEK_SET);
IO_CHECK(read(tmp_fd, buf, size), size);
SYS_CHECK(gzclose(file)); /* should close tmp_fd */
fd = fio_open(path, O_RDWR|O_CREAT|O_TRUNC, FILE_PERMISSIONS);
if (fd < 0) {
free(buf);
return -1;
}
IO_CHECK(fio_write(fd, buf, size), size);
free(buf);
return fio_close(fd);
}
else
{
return gzclose(file);
}
}
#endif
static void fio_send_file(int out, char const* path)
{
int fd = open(path, O_RDONLY);

View File

@ -3,6 +3,10 @@
#include <stdio.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
typedef enum
{
FIO_OPEN,
@ -27,7 +31,8 @@ typedef enum
{
FIO_LOCAL_HOST,
FIO_DB_HOST,
FIO_BACKUP_HOST
FIO_BACKUP_HOST,
FIO_REMOTE_HOST
} fio_location;
#define FIO_FDMAX 64
@ -82,6 +87,11 @@ extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks,
extern FILE* fio_open_stream(char const* name, fio_location location);
extern int fio_close_stream(FILE* f);
#ifdef HAVE_LIBZ
extern gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location);
extern int fio_gzclose(gzFile file, char const* path, int tmp_fd);
#endif
extern void fio_transfer(void* addr, size_t value);
#endif

View File

@ -61,6 +61,7 @@ typedef struct DirectoryMethodFile
char *temp_suffix;
#ifdef HAVE_LIBZ
gzFile gzfp;
int gz_tmp;
#endif
} DirectoryMethodFile;
@ -74,11 +75,12 @@ dir_getlasterror(void)
static Walfile
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
static char tmppath[MAXPGPATH];
int fd;
char tmppath[MAXPGPATH];
int fd = -1;
DirectoryMethodFile *f;
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
int gz_tmp = -1;
#endif
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
@ -92,17 +94,12 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* does not do any system calls to fsync() to make changes permanent on
* disk.
*/
fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST);
if (fd < 0)
return NULL;
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
gzfp = gzdopen(fd, "wb");
gzfp = fio_gzopen(tmppath, "wb", &gz_tmp, FIO_BACKUP_HOST);
if (gzfp == NULL)
{
close(fd);
return NULL;
}
@ -113,8 +110,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
return NULL;
}
}
else
#endif
{
fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST);
if (fd < 0)
return NULL;
}
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression == 0)
{
@ -173,7 +175,10 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
f->gzfp = gzfp;
f->gz_tmp = gz_tmp;
}
#endif
f->fd = fd;
f->currpos = 0;
@ -218,14 +223,14 @@ dir_close(Walfile f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
static char tmppath[MAXPGPATH];
static char tmppath2[MAXPGPATH];
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
r = gzclose(df->gzfp);
r = fio_gzclose(df->gzfp, df->fullpath, df->gz_tmp);
else
#endif
r = fio_close(df->fd);
@ -248,7 +253,7 @@ dir_close(Walfile f, WalCloseMethod method)
snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "");
r = durable_rename(tmppath, tmppath2, progname);
r = fio_rename(tmppath, tmppath2, FIO_BACKUP_HOST);
file_path = tmppath2;
}
else if (method == CLOSE_UNLINK)
@ -258,7 +263,7 @@ dir_close(Walfile f, WalCloseMethod method)
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "",
df->temp_suffix ? df->temp_suffix : "");
r = unlink(tmppath);
r = fio_unlink(tmppath, FIO_BACKUP_HOST);
}
else
{
@ -267,7 +272,7 @@ dir_close(Walfile f, WalCloseMethod method)
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
file_path = df->fullpath;
file_path = df->fullpath;
if (dir_data->sync && !is_remote_agent)
{
r = fsync_fname(df->fullpath, false, progname);
@ -315,7 +320,7 @@ static ssize_t
dir_get_file_size(const char *pathname)
{
struct stat statbuf;
static char tmppath[MAXPGPATH];
char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
@ -329,7 +334,7 @@ dir_get_file_size(const char *pathname)
static bool
dir_existsfile(const char *pathname)
{
static char tmppath[MAXPGPATH];
char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);