1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-12 11:45:24 +02:00

Rewrite fio_gz* support

This commit is contained in:
Konstantin Knizhnik 2019-03-14 13:58:02 +03:00
parent f0cbfb7147
commit f025b90eb0
4 changed files with 230 additions and 78 deletions

View File

@ -1102,7 +1102,6 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
#ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL;
int gz_tmp = -1;
if (is_compress)
{
@ -1135,14 +1134,10 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
{
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, &gz_tmp, FIO_BACKUP_HOST);
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, instance_config.compress_level, FIO_BACKUP_HOST);
if (gz_out == NULL)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
if (gzsetparams(gz_out, instance_config.compress_level, Z_DEFAULT_STRATEGY) != Z_OK)
elog(ERROR, "Cannot set compression level %d to file \"%s\": %s",
instance_config.compress_level, to_path_temp,
get_gz_error(gz_out, errno));
}
else
#endif
@ -1176,7 +1171,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
#ifdef HAVE_LIBZ
if (is_compress)
{
if (gzwrite(gz_out, buf, read_len) != read_len)
if (fio_gzwrite(gz_out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
@ -1204,7 +1199,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
#ifdef HAVE_LIBZ
if (is_compress)
{
if (fio_gzclose(gz_out, to_path_temp, gz_tmp) != 0)
if (fio_gzclose(gz_out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
@ -1696,10 +1691,9 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
{
char buf [1024];
gzFile gz_in = NULL;
int gz_tmp = -1;
INIT_FILE_CRC32(true, crc2);
gz_in = fio_gzopen(path2, PG_BINARY_R, &gz_tmp, FIO_BACKUP_HOST);
gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST);
if (gz_in == NULL)
/* File cannot be read */
elog(ERROR,
@ -1709,20 +1703,20 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
for (;;)
{
size_t read_len = 0;
read_len = gzread(gz_in, buf, sizeof(buf));
if (read_len != sizeof(buf) && !gzeof(gz_in))
read_len = fio_gzread(gz_in, buf, sizeof(buf));
if (read_len != sizeof(buf) && !fio_gzeof(gz_in))
/* An error occurred while reading the file */
elog(ERROR,
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
path1, path2);
COMP_FILE_CRC32(true, crc2, buf, read_len);
if (gzeof(gz_in) || read_len == 0)
if (fio_gzeof(gz_in) || read_len == 0)
break;
}
FIN_FILE_CRC32(true, crc2);
if (fio_gzclose(gz_in, path2, gz_tmp) != 0)
if (fio_gzclose(gz_in) != 0)
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
path2, get_gz_error(gz_in, errno));
}

View File

@ -108,7 +108,6 @@ typedef struct XLogPageReadPrivate
#ifdef HAVE_LIBZ
gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH];
int gz_tmp;
#endif
} XLogPageReadPrivate;
@ -1020,7 +1019,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
private_data->xlogexists = true;
private_data->gz_xlogfile = fio_gzopen(private_data->gz_xlogpath,
"rb", &private_data->gz_tmp, private_data->location);
"rb", -1, private_data->location);
if (private_data->gz_xlogfile == NULL)
{
elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s",
@ -1072,7 +1071,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
#ifdef HAVE_LIBZ
else
{
if (gzseek(private_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1)
if (fio_gzseek(private_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1)
{
elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s",
private_data->thread_num,
@ -1081,7 +1080,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
return -1;
}
if (gzread(private_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
if (fio_gzread(private_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s",
private_data->thread_num,
@ -1147,7 +1146,7 @@ CleanupXLogPageRead(XLogReaderState *xlogreader)
#ifdef HAVE_LIBZ
else if (private_data->gz_xlogfile != NULL)
{
fio_gzclose(private_data->gz_xlogfile, private_data->gz_xlogpath, private_data->gz_tmp);
fio_gzclose(private_data->gz_xlogfile);
private_data->gz_xlogfile = NULL;
}
#endif

View File

@ -729,51 +729,69 @@ int fio_chmod(char const* path, int mode, fio_location location)
}
#ifdef HAVE_LIBZ
/* Open compressed file. In case of remove file, it is fetched to local temporary file in read-only mode or is written
* to temoporary file and rtansfered to remote host by fio_gzclose. */
gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location)
#define ZLIB_BUFFER_SIZE (64*1024)
#define MAX_WBITS 15 /* 32K LZ77 window */
#define DEF_MEM_LEVEL 8
#define FIO_GZ_REMOTE_MARKER 1
typedef struct fioGZFile
{
gzFile file;
z_stream strm;
int fd;
bool compress;
bool eof;
Bytef buf[ZLIB_BUFFER_SIZE];
} fioGZFile;
gzFile
fio_gzopen(char const* path, char const* mode, int level, fio_location location)
{
int rc;
if (fio_is_remote(location))
{
char pattern1[] = "/tmp/gz.XXXXXX";
char pattern2[] = "gz.XXXXXX";
char* path = pattern1;
int fd = mkstemp(path); /* first try to create file in current directory */
if (fd < 0)
{
path = pattern2;
fd = mkstemp(path); /* if it is not possible, try to create it in /tmp/ */
if (fd < 0)
return NULL;
}
unlink(path); /* delete file on close */
fioGZFile* gz = (fioGZFile*)malloc(sizeof(fioGZFile));
memset(&gz->strm, 0, sizeof(gz->strm));
gz->eof = 0;
if (strcmp(mode, PG_BINARY_W) == 0)
if (strcmp(mode, PG_BINARY_W) == 0) /* compress */
{
*tmp_fd = fd;
gz->strm.next_out = gz->buf;
gz->strm.avail_out = ZLIB_BUFFER_SIZE;
rc = deflateInit2(&gz->strm,
level,
Z_DEFLATED,
MAX_WBITS + 16, DEF_MEM_LEVEL,
Z_DEFAULT_STRATEGY);
if (rc == Z_OK)
{
gz->compress = 1;
gz->fd = fio_open(path, O_WRONLY|O_CREAT|O_TRUNC, location);
}
}
else
{
int rd = fio_open(path, O_RDONLY|PG_BINARY, location);
struct stat st;
void* buf;
if (rd < 0) {
return NULL;
gz->strm.next_in = gz->buf;
gz->strm.avail_in = ZLIB_BUFFER_SIZE;
rc = inflateInit2(&gz->strm, 15 + 16);
gz->strm.avail_in = 0;
if (rc == Z_OK)
{
gz->compress = 0;
gz->fd = fio_open(path, O_RDONLY, location);
}
SYS_CHECK(fio_fstat(rd, &st));
buf = malloc(st.st_size);
IO_CHECK(fio_read(rd, buf, st.st_size), st.st_size);
IO_CHECK(write(fd, buf, st.st_size), st.st_size);
SYS_CHECK(fio_close(rd));
free(buf);
*tmp_fd = -1;
}
file = gzdopen(fd, mode);
if (rc != Z_OK)
{
free(gz);
return NULL;
}
return (gzFile)((size_t)gz + FIO_GZ_REMOTE_MARKER);
}
else
{
*tmp_fd = -1;
gzFile file;
if (strcmp(mode, PG_BINARY_W) == 0)
{
int fd = open(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FILE_PERMISSIONS);
@ -783,43 +801,180 @@ gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location
}
else
file = gzopen(path, mode);
if (file != NULL && level != Z_DEFAULT_COMPRESSION)
{
if (gzsetparams(file, level, Z_DEFAULT_STRATEGY) != Z_OK)
elog(ERROR, "Cannot set compression level %d: %s",
level, strerror(errno));
}
return file;
}
return file;
}
/* Close compressed file. In case of writing remote file, content of temporary file is trasfered to remote host */
int fio_gzclose(gzFile file, char const* path, int tmp_fd)
int
fio_gzread(gzFile f, void *buf, unsigned size)
{
if (tmp_fd >= 0)
if ((size_t)f & FIO_GZ_REMOTE_MARKER)
{
off_t size;
void* buf;
int fd;
int rc;
fioGZFile* gz = (fioGZFile*)((size_t)f - FIO_GZ_REMOTE_MARKER);
SYS_CHECK(gzflush(file, Z_FINISH));
size = lseek(tmp_fd, 0, SEEK_END);
buf = malloc(size);
lseek(tmp_fd, 0, SEEK_SET);
IO_CHECK(read(tmp_fd, buf, size), size);
SYS_CHECK(gzclose(file)); /* should close tmp_fd */
fd = fio_open(path, O_RDWR|O_CREAT|O_EXCL|PG_BINARY, FILE_PERMISSIONS);
if (fd < 0) {
free(buf);
return -1;
if (gz->eof)
{
return 0;
}
gz->strm.next_out = (Bytef *)buf;
gz->strm.avail_out = size;
while (1)
{
if (gz->strm.avail_in != 0) /* If there is some data in receiver buffer, then decmpress it */
{
rc = inflate(&gz->strm, Z_NO_FLUSH);
if (rc == Z_STREAM_END)
{
gz->eof = 1;
}
else if (rc != Z_OK)
{
return -1;
}
if (gz->strm.avail_out != size)
{
return size - gz->strm.avail_out;
}
if (gz->strm.avail_in == 0)
{
gz->strm.next_in = gz->buf;
}
}
else
{
gz->strm.next_in = gz->buf;
}
rc = read(gz->fd, gz->strm.next_in + gz->strm.avail_in, gz->buf + ZLIB_BUFFER_SIZE - gz->strm.next_in - gz->strm.avail_in);
if (rc > 0)
{
gz->strm.avail_in += rc;
}
else
{
if (rc == 0)
{
gz->eof = 1;
}
return rc;
}
}
IO_CHECK(fio_write(fd, buf, size), size);
free(buf);
return fio_close(fd);
}
else
{
return gzclose(file);
return gzread(f, buf, size);
}
}
int
fio_gzwrite(gzFile f, void const* buf, unsigned size)
{
if ((size_t)f & FIO_GZ_REMOTE_MARKER)
{
int rc;
fioGZFile* gz = (fioGZFile*)((size_t)f - FIO_GZ_REMOTE_MARKER);
gz->strm.next_in = (Bytef *)buf;
gz->strm.avail_in = size;
do
{
if (gz->strm.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
{
gz->strm.next_out = gz->buf; /* Reset pointer to the beginning of buffer */
if (gz->strm.avail_in != 0) /* Has something in input buffer */
{
rc = deflate(&gz->strm, Z_NO_FLUSH);
Assert(rc == Z_OK);
gz->strm.next_out = gz->buf; /* Reset pointer to the beginning of bufer */
}
else
{
break;
}
}
rc = write(gz->fd, gz->strm.next_out, ZLIB_BUFFER_SIZE - gz->strm.avail_out);
if (rc >= 0)
{
gz->strm.next_out += rc;
gz->strm.avail_out += rc;
}
else
{
return rc;
}
} while (gz->strm.avail_out != ZLIB_BUFFER_SIZE || gz->strm.avail_in != 0);
return size;
}
else
{
return gzwrite(f, buf, size);
}
}
int
fio_gzclose(gzFile f)
{
if ((size_t)f & FIO_GZ_REMOTE_MARKER)
{
fioGZFile* gz = (fioGZFile*)((size_t)f - FIO_GZ_REMOTE_MARKER);
int rc;
if (gz->compress)
{
gz->strm.next_out = gz->buf;
rc = deflate(&gz->strm, Z_FINISH);
Assert(rc == Z_STREAM_END && gz->strm.avail_out != ZLIB_BUFFER_SIZE);
deflateEnd(&gz->strm);
rc = write(gz->fd, gz->buf, ZLIB_BUFFER_SIZE - gz->strm.avail_out);
if (rc != ZLIB_BUFFER_SIZE - gz->strm.avail_out)
{
return -1;
}
}
else
{
inflateEnd(&gz->strm);
}
rc = fio_close(gz->fd);
free(gz);
return rc;
}
else
{
return gzclose(f);
}
}
int fio_gzeof(gzFile f)
{
if ((size_t)f & FIO_GZ_REMOTE_MARKER)
{
fioGZFile* gz = (fioGZFile*)((size_t)f - FIO_GZ_REMOTE_MARKER);
return gz->eof;
}
else
{
return gzeof(f);
}
}
z_off_t fio_gzseek(gzFile f, z_off_t offset, int whence)
{
Assert(!((size_t)f & FIO_GZ_REMOTE_MARKER));
return gzseek(f, offset, whence);
}
#endif
/* Send file content */

View File

@ -105,8 +105,12 @@ extern FILE* fio_open_stream(char const* name, fio_location location);
extern int fio_close_stream(FILE* f);
#ifdef HAVE_LIBZ
extern gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location);
extern int fio_gzclose(gzFile file, char const* path, int tmp_fd);
extern gzFile fio_gzopen(char const* path, char const* mode, int level, fio_location location);
extern int fio_gzclose(gzFile file);
extern int fio_gzread(gzFile f, void *buf, unsigned size);
extern int fio_gzwrite(gzFile f, void const* buf, unsigned size);
extern int fio_gzeof(gzFile f);
extern z_off_t fio_gzseek(gzFile f, z_off_t offset, int whence);
#endif
#endif