diff --git a/src/archive.c b/src/archive.c index 8866ce66..7df3837d 100644 --- a/src/archive.c +++ b/src/archive.c @@ -1556,6 +1556,9 @@ get_wal_file_internal(const char *from_path, const char *to_path, FILE *out, } goto cleanup; } + + /* disable stdio buffering */ + setvbuf(out, NULL, _IONBF, BUFSIZ); } #ifdef HAVE_LIBZ else diff --git a/src/data.c b/src/data.c index ce8beffd..c49b8283 100644 --- a/src/data.c +++ b/src/data.c @@ -566,8 +566,8 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, datapagemap_iterator_t *iter = NULL; /* stdio buffers */ - char in_buffer[STDIO_BUFSIZE]; - char out_buffer[STDIO_BUFSIZE]; + char *in_buf = NULL; + char *out_buf = NULL; /* sanity */ if (file->size % BLCKSZ != 0) @@ -634,17 +634,12 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, from_fullpath, strerror(errno)); } - if (!fio_is_remote_file(in)) - setvbuf(in, in_buffer, _IOFBF, STDIO_BUFSIZE); - /* open backup file for write */ out = fopen(to_fullpath, PG_BINARY_W); if (out == NULL) elog(ERROR, "Cannot open backup file \"%s\": %s", to_fullpath, strerror(errno)); - setvbuf(out, out_buffer, _IOFBF, STDIO_BUFSIZE); - /* update file permission */ if (chmod(to_fullpath, FILE_PERMISSION) == -1) elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, @@ -667,6 +662,24 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, else use_pagemap = true; + if (!fio_is_remote_file(in)) + { + /* enable stdio buffering for local input file, + * unless the pagemap is involved, which + * imply a lot of random access. + */ + if (use_pagemap) + setvbuf(in, NULL, _IONBF, BUFSIZ); + else + { + in_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); + } + } + + /* enable stdio buffering for output file */ + out_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE); /* Remote mode */ if (fio_is_remote_file(in)) @@ -789,6 +802,9 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath, strerror(errno)); } + + pg_free(in_buf); + pg_free(out_buf); } /* @@ -837,18 +853,18 @@ backup_non_data_file(pgFile *file, pgFile *prev_file, size_t restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char *to_fullpath) { - int i; + int i; size_t total_write_len = 0; - char buffer[STDIO_BUFSIZE]; + char *in_buf; for (i = parray_num(parent_chain) - 1; i >= 0; i--) { - char from_root[MAXPGPATH]; - char from_fullpath[MAXPGPATH]; - FILE *in = NULL; + char from_root[MAXPGPATH]; + char from_fullpath[MAXPGPATH]; + FILE *in = NULL; - pgFile **res_file = NULL; - pgFile *tmp_file = NULL; + pgFile **res_file = NULL; + pgFile *tmp_file = NULL; pgBackup *backup = (pgBackup *) parray_get(parent_chain, i); @@ -886,7 +902,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath, strerror(errno)); - setvbuf(in, buffer, _IOFBF, STDIO_BUFSIZE); + in_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); /* * Restore the file. @@ -902,6 +919,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, const char elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath, strerror(errno)); } + pg_free(in_buf); + return total_write_len; } @@ -912,6 +931,21 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers BackupPageHeader header; BlockNumber blknum = 0; size_t write_len = 0; + off_t cur_pos = 0; + + /* + * We rely on stdio buffering of input and output. + * For buffering to be efficient, we try to minimize the + * number of lseek syscalls, because it forces buffer flush. + * For that, we track current write position in + * output file and issue fseek only when offset of block to be + * written not equal to current write position, which happens + * a lot when blocks from incremental backup are restored, + * but should never happen in case of blocks from FULL backup. + */ + if (fio_fseek(out, cur_pos) < 0) + elog(ERROR, "Cannot seek block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); for (;;) { @@ -928,23 +962,24 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers /* read BackupPageHeader */ read_len = fread(&header, 1, sizeof(header), in); + if (ferror(in)) + elog(ERROR, "Cannot read header of block %u of \"%s\": %s", + blknum, from_fullpath, strerror(errno)); + if (read_len != sizeof(header)) { - int errno_tmp = errno; if (read_len == 0 && feof(in)) break; /* EOF found */ - else if (read_len != 0 && feof(in)) + + if (read_len != 0 && feof(in)) elog(ERROR, "Odd size page found at block %u of \"%s\"", blknum, from_fullpath); - else - elog(ERROR, "Cannot read header of block %u of \"%s\": %s", - blknum, from_fullpath, strerror(errno_tmp)); } /* Consider empty blockm. wtf empty block ? */ if (header.block == 0 && header.compressed_size == 0) { - elog(VERBOSE, "Skip empty block of \"%s\"", from_fullpath); + elog(WARNING, "Skip empty block of \"%s\"", from_fullpath); continue; } @@ -1019,14 +1054,19 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers is_compressed = true; } - write_pos = blknum * BLCKSZ; - /* * Seek and write the restored page. + * When restoring file from FULL backup, pages are written sequentially, + * so there is no need to issue fseek for every page. */ - if (fio_fseek(out, write_pos) < 0) - elog(ERROR, "Cannot seek block %u of \"%s\": %s", - blknum, to_fullpath, strerror(errno)); + write_pos = blknum * BLCKSZ; + + if (cur_pos != write_pos) + { + if (fio_fseek(out, blknum * BLCKSZ) < 0) + elog(ERROR, "Cannot seek block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } /* If page is compressed and restore is in remote mode, send compressed * page to the remote side. @@ -1048,6 +1088,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers } write_len += BLCKSZ; + cur_pos = write_pos + BLCKSZ; /* update current write position */ } elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, write_len); @@ -1063,8 +1104,8 @@ void restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file, const char *from_fullpath, const char *to_fullpath) { - ssize_t read_len = 0; - char buf[STDIO_BUFSIZE]; /* 64kB buffer */ + size_t read_len = 0; + char *buf = pgut_malloc(STDIO_BUFSIZE); /* 64kB buffer */ /* copy content */ for (;;) @@ -1075,20 +1116,25 @@ restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file, if (interrupted || thread_interrupted) elog(ERROR, "Interrupted during non-data file restore"); - read_len = fread(buf, 1, sizeof(buf), in); + read_len = fread(buf, 1, STDIO_BUFSIZE, in); - if (read_len == 0 && feof(in)) - break; - - if (read_len < 0) + if (ferror(in)) elog(ERROR, "Cannot read backup file \"%s\": %s", - from_fullpath, strerror(errno)); + from_fullpath, strerror(errno)); - if (fio_fwrite(out, buf, read_len) != read_len) - elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, - strerror(errno)); + if (read_len > 0) + { + if (fio_fwrite(out, buf, read_len) != read_len) + elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, + strerror(errno)); + } + + if (feof(in)) + break; } + pg_free(buf); + elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, file->write_size); } @@ -1103,7 +1149,6 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, pgFile *tmp_file = NULL; pgBackup *tmp_backup = NULL; - char buffer[STDIO_BUFSIZE]; /* Check if full copy of destination file is available in destination backup */ if (dest_file->write_size > 0) @@ -1176,7 +1221,8 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath, strerror(errno)); - setvbuf(in, buffer, _IOFBF, STDIO_BUFSIZE); + /* disable stdio buffering for non-data files */ + setvbuf(in, NULL, _IONBF, BUFSIZ); /* do actual work */ restore_non_data_file_internal(in, out, tmp_file, from_fullpath, to_fullpath); @@ -1192,6 +1238,7 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, * Copy file to backup. * We do not apply compression to these files, because * it is either small control file or already compressed cfs file. + * TODO: optimize remote copying */ void backup_non_data_file_internal(const char *from_fullpath, @@ -1199,10 +1246,10 @@ backup_non_data_file_internal(const char *from_fullpath, const char *to_fullpath, pgFile *file, bool missing_ok) { - FILE *in; - FILE *out; - ssize_t read_len = 0; - char buf[STDIO_BUFSIZE]; /* 64kB buffer */ + FILE *in; + FILE *out; + ssize_t read_len = 0; + char *buf; pg_crc32 crc; INIT_FILE_CRC32(true, crc); @@ -1247,18 +1294,26 @@ backup_non_data_file_internal(const char *from_fullpath, elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, strerror(errno)); + /* disable stdio buffering for local input/output files */ + if (!fio_is_remote_file(in)) + setvbuf(in, NULL, _IONBF, BUFSIZ); + setvbuf(out, NULL, _IONBF, BUFSIZ); + + /* allocate 64kB buffer */ + buf = pgut_malloc(STDIO_BUFSIZE); + /* copy content and calc CRC */ for (;;) { - read_len = fio_fread(in, buf, sizeof(buf)); - - if (read_len == 0) - break; + read_len = fio_fread(in, buf, STDIO_BUFSIZE); if (read_len < 0) elog(ERROR, "Cannot read from source file \"%s\": %s", from_fullpath, strerror(errno)); + if (read_len == 0) + break; + if (fwrite(buf, 1, read_len, out) != read_len) elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno)); @@ -1267,6 +1322,19 @@ backup_non_data_file_internal(const char *from_fullpath, COMP_FILE_CRC32(true, crc, buf, read_len); file->read_size += read_len; + +// if (read_len < STDIO_BUFSIZE) +// { +// if (!fio_is_remote_file(in)) +// { +// if (ferror(in)) +// elog(ERROR, "Cannot read from source file \"%s\": %s", +// from_fullpath, strerror(errno)); +// +// if (feof(in)) +// break; +// } +// } } file->write_size = (int64) file->read_size; @@ -1280,6 +1348,7 @@ backup_non_data_file_internal(const char *from_fullpath, if (fclose(out)) elog(ERROR, "Cannot write \"%s\": %s", to_fullpath, strerror(errno)); fio_fclose(in); + pg_free(buf); } /* @@ -1478,9 +1547,13 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version, /* read BackupPageHeader */ read_len = fread(&header, 1, sizeof(header), in); + + if (ferror(in)) + elog(ERROR, "Cannot read header of block %u of \"%s\": %s", + blknum, file->path, strerror(errno)); + if (read_len != sizeof(header)) { - int errno_tmp = errno; if (read_len == 0 && feof(in)) break; /* EOF found */ else if (read_len != 0 && feof(in)) @@ -1489,7 +1562,7 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version, blknum, file->path); else elog(WARNING, "Cannot read header of block %u of \"%s\": %s", - blknum, file->path, strerror(errno_tmp)); + blknum, file->path, strerror(errno)); return false; } diff --git a/src/dir.c b/src/dir.c index 5b8fcf8d..5b5cd2b8 100644 --- a/src/dir.c +++ b/src/dir.c @@ -265,7 +265,7 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok) { FILE *fp; pg_crc32 crc = 0; - char buf[STDIO_BUFSIZE]; + char *buf; size_t len = 0; INIT_FILE_CRC32(use_crc32c, crc); @@ -287,30 +287,31 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok) file_path, strerror(errno)); } + /* disable stdio buffering */ + setvbuf(fp, NULL, _IONBF, BUFSIZ); + buf = pgut_malloc(STDIO_BUFSIZE); + /* calc CRC of file */ for (;;) { if (interrupted) elog(ERROR, "interrupted during CRC calculation"); - len = fread(&buf, 1, sizeof(buf), fp); + len = fread(buf, 1, STDIO_BUFSIZE, fp); - if (len == 0) - { - /* we either run into eof or error */ - if (feof(fp)) - break; - - if (ferror(fp)) - elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno)); - } + if (ferror(fp)) + elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno)); /* update CRC */ COMP_FILE_CRC32(use_crc32c, crc, buf, len); + + if (feof(fp)) + break; } FIN_FILE_CRC32(use_crc32c, crc); fclose(fp); + pg_free(buf); return crc; } @@ -324,11 +325,11 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok) pg_crc32 pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok) { - gzFile fp; - pg_crc32 crc = 0; - char buf[STDIO_BUFSIZE]; - int len = 0; - int err; + gzFile fp; + pg_crc32 crc = 0; + int len = 0; + int err; + char *buf; INIT_FILE_CRC32(use_crc32c, crc); @@ -349,13 +350,15 @@ pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok) file_path, strerror(errno)); } + buf = pgut_malloc(STDIO_BUFSIZE); + /* calc CRC of file */ for (;;) { if (interrupted) elog(ERROR, "interrupted during CRC calculation"); - len = gzread(fp, &buf, sizeof(buf)); + len = gzread(fp, buf, STDIO_BUFSIZE); if (len <= 0) { @@ -377,6 +380,7 @@ pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok) FIN_FILE_CRC32(use_crc32c, crc); gzclose(fp); + pg_free(buf); return crc; } @@ -1505,11 +1509,16 @@ dir_read_file_list(const char *root, const char *external_prefix, FILE *fp; parray *files; char buf[MAXPGPATH * 2]; + char stdio_buf[STDIO_BUFSIZE]; fp = fio_open_stream(file_txt, location); if (fp == NULL) elog(ERROR, "cannot open \"%s\": %s", file_txt, strerror(errno)); + /* enable stdio buffering for local file */ + if (!fio_is_remote(location)) + setvbuf(fp, stdio_buf, _IOFBF, STDIO_BUFSIZE); + files = parray_new(); while (fgets(buf, lengthof(buf), fp)) diff --git a/src/restore.c b/src/restore.c index 49ff04b3..767e9a57 100644 --- a/src/restore.c +++ b/src/restore.c @@ -756,10 +756,10 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain, static void * restore_files(void *arg) { - int i; - char to_fullpath[MAXPGPATH]; - FILE *out = NULL; - char buffer[STDIO_BUFSIZE]; + int i; + char to_fullpath[MAXPGPATH]; + FILE *out = NULL; + char *out_buf = pgut_malloc(STDIO_BUFSIZE); restore_files_arg *arguments = (restore_files_arg *) arg; @@ -856,18 +856,25 @@ restore_files(void *arg) if (dest_file->write_size == 0) goto done; - if (!fio_is_remote_file(out)) - setvbuf(out, buffer, _IOFBF, STDIO_BUFSIZE); - /* Restore destination file */ if (dest_file->is_datafile && !dest_file->is_cfs) + { + /* enable stdio buffering for local destination file */ + if (!fio_is_remote_file(out)) + setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE); /* Destination file is data file */ arguments->restored_bytes += restore_data_file(arguments->parent_chain, dest_file, out, to_fullpath); + } else + { + /* disable stdio buffering for local destination file */ + if (!fio_is_remote_file(out)) + setvbuf(out, NULL, _IONBF, BUFSIZ); /* Destination file is non-data file */ arguments->restored_bytes += restore_non_data_file(arguments->parent_chain, arguments->dest_backup, dest_file, out, to_fullpath); + } done: /* close file */ @@ -876,6 +883,8 @@ done: strerror(errno)); } + free(out_buf); + /* ssh connection to longer needed */ fio_disconnect(); diff --git a/src/utils/file.c b/src/utils/file.c index b53f5ccf..bb05f379 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -1470,6 +1470,7 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn, return n_blocks_read; } +/* TODO: read file using large buffer */ static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap) { BlockNumber blknum = 0; @@ -1881,7 +1882,7 @@ static void fio_send_file_impl(int out, char const* path) FILE *fp; fio_header hdr; char *buf = pgut_malloc(CHUNK_SIZE); - ssize_t read_len = 0; + size_t read_len = 0; char *errormsg = NULL; /* open source file for read */ @@ -1917,13 +1918,16 @@ static void fio_send_file_impl(int out, char const* path) goto cleanup; } + /* disable stdio buffering */ + setvbuf(fp, NULL, _IONBF, BUFSIZ); + /* copy content */ for (;;) { read_len = fread(buf, 1, CHUNK_SIZE, fp); /* report error */ - if (read_len < 0 || (read_len == 0 && !feof(fp))) + if (ferror(fp)) { hdr.cop = FIO_ERROR; errormsg = pgut_malloc(MAXPGPATH); @@ -1938,9 +1942,7 @@ static void fio_send_file_impl(int out, char const* path) goto cleanup; } - else if (read_len == 0) - break; - else + if (read_len > 0) { /* send chunk */ hdr.cop = FIO_PAGE; @@ -1948,6 +1950,9 @@ static void fio_send_file_impl(int out, char const* path) IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); IO_CHECK(fio_write_all(out, buf, read_len), read_len); } + + if (feof(fp)) + break; } /* we are done, send eof */