1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-07 14:18:17 +02:00

improvement: use dynamically allocated buffers for stdio buffering

This commit is contained in:
Grigory Smolkin 2020-04-23 22:56:56 +03:00
parent 54f63319be
commit a9aab29a04
5 changed files with 177 additions and 78 deletions

View File

@ -1556,6 +1556,9 @@ get_wal_file_internal(const char *from_path, const char *to_path, FILE *out,
}
goto cleanup;
}
/* disable stdio buffering */
setvbuf(out, NULL, _IONBF, BUFSIZ);
}
#ifdef HAVE_LIBZ
else

View File

@ -566,8 +566,8 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
datapagemap_iterator_t *iter = NULL;
/* stdio buffers */
char in_buffer[STDIO_BUFSIZE];
char out_buffer[STDIO_BUFSIZE];
char *in_buf = NULL;
char *out_buf = NULL;
/* sanity */
if (file->size % BLCKSZ != 0)
@ -634,17 +634,12 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
from_fullpath, strerror(errno));
}
if (!fio_is_remote_file(in))
setvbuf(in, in_buffer, _IOFBF, STDIO_BUFSIZE);
/* open backup file for write */
out = fopen(to_fullpath, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open backup file \"%s\": %s",
to_fullpath, strerror(errno));
setvbuf(out, out_buffer, _IOFBF, STDIO_BUFSIZE);
/* update file permission */
if (chmod(to_fullpath, FILE_PERMISSION) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
@ -667,6 +662,24 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
else
use_pagemap = true;
if (!fio_is_remote_file(in))
{
/* enable stdio buffering for local input file,
* unless the pagemap is involved, which
* imply a lot of random access.
*/
if (use_pagemap)
setvbuf(in, NULL, _IONBF, BUFSIZ);
else
{
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
}
/* enable stdio buffering for output file */
out_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
/* Remote mode */
if (fio_is_remote_file(in))
@ -789,6 +802,9 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath,
strerror(errno));
}
pg_free(in_buf);
pg_free(out_buf);
}
/*
@ -837,18 +853,18 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
size_t
restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char *to_fullpath)
{
int i;
int i;
size_t total_write_len = 0;
char buffer[STDIO_BUFSIZE];
char *in_buf;
for (i = parray_num(parent_chain) - 1; i >= 0; i--)
{
char from_root[MAXPGPATH];
char from_fullpath[MAXPGPATH];
FILE *in = NULL;
char from_root[MAXPGPATH];
char from_fullpath[MAXPGPATH];
FILE *in = NULL;
pgFile **res_file = NULL;
pgFile *tmp_file = NULL;
pgFile **res_file = NULL;
pgFile *tmp_file = NULL;
pgBackup *backup = (pgBackup *) parray_get(parent_chain, i);
@ -886,7 +902,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char
elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath,
strerror(errno));
setvbuf(in, buffer, _IOFBF, STDIO_BUFSIZE);
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
/*
* Restore the file.
@ -902,6 +919,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char
elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath,
strerror(errno));
}
pg_free(in_buf);
return total_write_len;
}
@ -912,6 +931,21 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
BackupPageHeader header;
BlockNumber blknum = 0;
size_t write_len = 0;
off_t cur_pos = 0;
/*
* We rely on stdio buffering of input and output.
* For buffering to be efficient, we try to minimize the
* number of lseek syscalls, because it forces buffer flush.
* For that, we track current write position in
* output file and issue fseek only when offset of block to be
* written not equal to current write position, which happens
* a lot when blocks from incremental backup are restored,
* but should never happen in case of blocks from FULL backup.
*/
if (fio_fseek(out, cur_pos) < 0)
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
for (;;)
{
@ -928,23 +962,24 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
/* read BackupPageHeader */
read_len = fread(&header, 1, sizeof(header), in);
if (ferror(in))
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno));
if (read_len != sizeof(header))
{
int errno_tmp = errno;
if (read_len == 0 && feof(in))
break; /* EOF found */
else if (read_len != 0 && feof(in))
if (read_len != 0 && feof(in))
elog(ERROR, "Odd size page found at block %u of \"%s\"",
blknum, from_fullpath);
else
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno_tmp));
}
/* Consider empty blockm. wtf empty block ? */
if (header.block == 0 && header.compressed_size == 0)
{
elog(VERBOSE, "Skip empty block of \"%s\"", from_fullpath);
elog(WARNING, "Skip empty block of \"%s\"", from_fullpath);
continue;
}
@ -1019,14 +1054,19 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
is_compressed = true;
}
write_pos = blknum * BLCKSZ;
/*
* Seek and write the restored page.
* When restoring file from FULL backup, pages are written sequentially,
* so there is no need to issue fseek for every page.
*/
if (fio_fseek(out, write_pos) < 0)
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
write_pos = blknum * BLCKSZ;
if (cur_pos != write_pos)
{
if (fio_fseek(out, blknum * BLCKSZ) < 0)
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
/* If page is compressed and restore is in remote mode, send compressed
* page to the remote side.
@ -1048,6 +1088,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
}
write_len += BLCKSZ;
cur_pos = write_pos + BLCKSZ; /* update current write position */
}
elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, write_len);
@ -1063,8 +1104,8 @@ void
restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file,
const char *from_fullpath, const char *to_fullpath)
{
ssize_t read_len = 0;
char buf[STDIO_BUFSIZE]; /* 64kB buffer */
size_t read_len = 0;
char *buf = pgut_malloc(STDIO_BUFSIZE); /* 64kB buffer */
/* copy content */
for (;;)
@ -1075,20 +1116,25 @@ restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file,
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during non-data file restore");
read_len = fread(buf, 1, sizeof(buf), in);
read_len = fread(buf, 1, STDIO_BUFSIZE, in);
if (read_len == 0 && feof(in))
break;
if (read_len < 0)
if (ferror(in))
elog(ERROR, "Cannot read backup file \"%s\": %s",
from_fullpath, strerror(errno));
from_fullpath, strerror(errno));
if (fio_fwrite(out, buf, read_len) != read_len)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
strerror(errno));
if (read_len > 0)
{
if (fio_fwrite(out, buf, read_len) != read_len)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
strerror(errno));
}
if (feof(in))
break;
}
pg_free(buf);
elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, file->write_size);
}
@ -1103,7 +1149,6 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup,
pgFile *tmp_file = NULL;
pgBackup *tmp_backup = NULL;
char buffer[STDIO_BUFSIZE];
/* Check if full copy of destination file is available in destination backup */
if (dest_file->write_size > 0)
@ -1176,7 +1221,8 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup,
elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath,
strerror(errno));
setvbuf(in, buffer, _IOFBF, STDIO_BUFSIZE);
/* disable stdio buffering for non-data files */
setvbuf(in, NULL, _IONBF, BUFSIZ);
/* do actual work */
restore_non_data_file_internal(in, out, tmp_file, from_fullpath, to_fullpath);
@ -1192,6 +1238,7 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup,
* Copy file to backup.
* We do not apply compression to these files, because
* it is either small control file or already compressed cfs file.
* TODO: optimize remote copying
*/
void
backup_non_data_file_internal(const char *from_fullpath,
@ -1199,10 +1246,10 @@ backup_non_data_file_internal(const char *from_fullpath,
const char *to_fullpath, pgFile *file,
bool missing_ok)
{
FILE *in;
FILE *out;
ssize_t read_len = 0;
char buf[STDIO_BUFSIZE]; /* 64kB buffer */
FILE *in;
FILE *out;
ssize_t read_len = 0;
char *buf;
pg_crc32 crc;
INIT_FILE_CRC32(true, crc);
@ -1247,18 +1294,26 @@ backup_non_data_file_internal(const char *from_fullpath,
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
/* disable stdio buffering for local input/output files */
if (!fio_is_remote_file(in))
setvbuf(in, NULL, _IONBF, BUFSIZ);
setvbuf(out, NULL, _IONBF, BUFSIZ);
/* allocate 64kB buffer */
buf = pgut_malloc(STDIO_BUFSIZE);
/* copy content and calc CRC */
for (;;)
{
read_len = fio_fread(in, buf, sizeof(buf));
if (read_len == 0)
break;
read_len = fio_fread(in, buf, STDIO_BUFSIZE);
if (read_len < 0)
elog(ERROR, "Cannot read from source file \"%s\": %s",
from_fullpath, strerror(errno));
if (read_len == 0)
break;
if (fwrite(buf, 1, read_len, out) != read_len)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
strerror(errno));
@ -1267,6 +1322,19 @@ backup_non_data_file_internal(const char *from_fullpath,
COMP_FILE_CRC32(true, crc, buf, read_len);
file->read_size += read_len;
// if (read_len < STDIO_BUFSIZE)
// {
// if (!fio_is_remote_file(in))
// {
// if (ferror(in))
// elog(ERROR, "Cannot read from source file \"%s\": %s",
// from_fullpath, strerror(errno));
//
// if (feof(in))
// break;
// }
// }
}
file->write_size = (int64) file->read_size;
@ -1280,6 +1348,7 @@ backup_non_data_file_internal(const char *from_fullpath,
if (fclose(out))
elog(ERROR, "Cannot write \"%s\": %s", to_fullpath, strerror(errno));
fio_fclose(in);
pg_free(buf);
}
/*
@ -1478,9 +1547,13 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version,
/* read BackupPageHeader */
read_len = fread(&header, 1, sizeof(header), in);
if (ferror(in))
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
if (read_len != sizeof(header))
{
int errno_tmp = errno;
if (read_len == 0 && feof(in))
break; /* EOF found */
else if (read_len != 0 && feof(in))
@ -1489,7 +1562,7 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version,
blknum, file->path);
else
elog(WARNING, "Cannot read header of block %u of \"%s\": %s",
blknum, file->path, strerror(errno_tmp));
blknum, file->path, strerror(errno));
return false;
}

View File

@ -265,7 +265,7 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
{
FILE *fp;
pg_crc32 crc = 0;
char buf[STDIO_BUFSIZE];
char *buf;
size_t len = 0;
INIT_FILE_CRC32(use_crc32c, crc);
@ -287,30 +287,31 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
file_path, strerror(errno));
}
/* disable stdio buffering */
setvbuf(fp, NULL, _IONBF, BUFSIZ);
buf = pgut_malloc(STDIO_BUFSIZE);
/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");
len = fread(&buf, 1, sizeof(buf), fp);
len = fread(buf, 1, STDIO_BUFSIZE, fp);
if (len == 0)
{
/* we either run into eof or error */
if (feof(fp))
break;
if (ferror(fp))
elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno));
}
if (ferror(fp))
elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno));
/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len);
if (feof(fp))
break;
}
FIN_FILE_CRC32(use_crc32c, crc);
fclose(fp);
pg_free(buf);
return crc;
}
@ -324,11 +325,11 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
pg_crc32
pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok)
{
gzFile fp;
pg_crc32 crc = 0;
char buf[STDIO_BUFSIZE];
int len = 0;
int err;
gzFile fp;
pg_crc32 crc = 0;
int len = 0;
int err;
char *buf;
INIT_FILE_CRC32(use_crc32c, crc);
@ -349,13 +350,15 @@ pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok)
file_path, strerror(errno));
}
buf = pgut_malloc(STDIO_BUFSIZE);
/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");
len = gzread(fp, &buf, sizeof(buf));
len = gzread(fp, buf, STDIO_BUFSIZE);
if (len <= 0)
{
@ -377,6 +380,7 @@ pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok)
FIN_FILE_CRC32(use_crc32c, crc);
gzclose(fp);
pg_free(buf);
return crc;
}
@ -1505,11 +1509,16 @@ dir_read_file_list(const char *root, const char *external_prefix,
FILE *fp;
parray *files;
char buf[MAXPGPATH * 2];
char stdio_buf[STDIO_BUFSIZE];
fp = fio_open_stream(file_txt, location);
if (fp == NULL)
elog(ERROR, "cannot open \"%s\": %s", file_txt, strerror(errno));
/* enable stdio buffering for local file */
if (!fio_is_remote(location))
setvbuf(fp, stdio_buf, _IOFBF, STDIO_BUFSIZE);
files = parray_new();
while (fgets(buf, lengthof(buf), fp))

View File

@ -756,10 +756,10 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
static void *
restore_files(void *arg)
{
int i;
char to_fullpath[MAXPGPATH];
FILE *out = NULL;
char buffer[STDIO_BUFSIZE];
int i;
char to_fullpath[MAXPGPATH];
FILE *out = NULL;
char *out_buf = pgut_malloc(STDIO_BUFSIZE);
restore_files_arg *arguments = (restore_files_arg *) arg;
@ -856,18 +856,25 @@ restore_files(void *arg)
if (dest_file->write_size == 0)
goto done;
if (!fio_is_remote_file(out))
setvbuf(out, buffer, _IOFBF, STDIO_BUFSIZE);
/* Restore destination file */
if (dest_file->is_datafile && !dest_file->is_cfs)
{
/* enable stdio buffering for local destination file */
if (!fio_is_remote_file(out))
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
/* Destination file is data file */
arguments->restored_bytes += restore_data_file(arguments->parent_chain,
dest_file, out, to_fullpath);
}
else
{
/* disable stdio buffering for local destination file */
if (!fio_is_remote_file(out))
setvbuf(out, NULL, _IONBF, BUFSIZ);
/* Destination file is non-data file */
arguments->restored_bytes += restore_non_data_file(arguments->parent_chain,
arguments->dest_backup, dest_file, out, to_fullpath);
}
done:
/* close file */
@ -876,6 +883,8 @@ done:
strerror(errno));
}
free(out_buf);
/* ssh connection to longer needed */
fio_disconnect();

View File

@ -1470,6 +1470,7 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
return n_blocks_read;
}
/* TODO: read file using large buffer */
static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
{
BlockNumber blknum = 0;
@ -1881,7 +1882,7 @@ static void fio_send_file_impl(int out, char const* path)
FILE *fp;
fio_header hdr;
char *buf = pgut_malloc(CHUNK_SIZE);
ssize_t read_len = 0;
size_t read_len = 0;
char *errormsg = NULL;
/* open source file for read */
@ -1917,13 +1918,16 @@ static void fio_send_file_impl(int out, char const* path)
goto cleanup;
}
/* disable stdio buffering */
setvbuf(fp, NULL, _IONBF, BUFSIZ);
/* copy content */
for (;;)
{
read_len = fread(buf, 1, CHUNK_SIZE, fp);
/* report error */
if (read_len < 0 || (read_len == 0 && !feof(fp)))
if (ferror(fp))
{
hdr.cop = FIO_ERROR;
errormsg = pgut_malloc(MAXPGPATH);
@ -1938,9 +1942,7 @@ static void fio_send_file_impl(int out, char const* path)
goto cleanup;
}
else if (read_len == 0)
break;
else
if (read_len > 0)
{
/* send chunk */
hdr.cop = FIO_PAGE;
@ -1948,6 +1950,9 @@ static void fio_send_file_impl(int out, char const* path)
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, buf, read_len), read_len);
}
if (feof(fp))
break;
}
/* we are done, send eof */