1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-25 09:01:48 +02:00

[Issue #205] Avoid opening remote files via fio_open when copying files during backup

This commit is contained in:
Grigory Smolkin 2020-05-19 19:27:01 +03:00
parent f538183098
commit 9bf20803b2
7 changed files with 400 additions and 275 deletions

View File

@ -1387,15 +1387,16 @@ get_wal_file(const char *filename, const char *from_fullpath,
*/
if (fio_is_remote(FIO_BACKUP_HOST))
{
char *errmsg = NULL;
/* get file via ssh */
#ifdef HAVE_LIBZ
/* If requested file is regular WAL segment, then try to open it with '.gz' suffix... */
if (IsXLogFileName(filename))
rc = fio_send_file_gz(from_fullpath_gz, to_fullpath, out, thread_num);
rc = fio_send_file_gz(from_fullpath_gz, to_fullpath, out, NULL, &errmsg);
if (rc == FILE_MISSING)
#endif
/* ... failing that, use uncompressed */
rc = fio_send_file(from_fullpath, to_fullpath, out, thread_num);
rc = fio_send_file(from_fullpath, to_fullpath, out, NULL, &errmsg);
/* When not in prefetch mode, try to use partial file */
if (rc == FILE_MISSING && !prefetch_mode && IsXLogFileName(filename))
@ -1405,18 +1406,27 @@ get_wal_file(const char *filename, const char *from_fullpath,
#ifdef HAVE_LIBZ
/* '.gz.partial' goes first ... */
snprintf(from_partial, sizeof(from_partial), "%s.gz.partial", from_fullpath);
rc = fio_send_file_gz(from_partial, to_fullpath, out, thread_num);
rc = fio_send_file_gz(from_partial, to_fullpath, out, NULL, &errmsg);
if (rc == FILE_MISSING)
#endif
{
/* ... failing that, use '.partial' */
snprintf(from_partial, sizeof(from_partial), "%s.partial", from_fullpath);
rc = fio_send_file(from_partial, to_fullpath, out, thread_num);
rc = fio_send_file(from_partial, to_fullpath, out, NULL, &errmsg);
}
if (rc == SEND_OK)
src_partial = true;
}
if (rc == WRITE_FAILED)
elog(WARNING, "Thread [%d]: Cannot write to file '%s': %s",
thread_num, to_fullpath, strerror(errno));
if (errmsg)
elog(WARNING, "Thread [%d]: %s", thread_num, errmsg);
pg_free(errmsg);
}
else
{

View File

@ -550,19 +550,6 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync)
elog(ERROR, "Data files transferring failed, time elapsed: %s",
pretty_time);
/* Remove disappeared during backup files from backup_list */
for (i = 0; i < parray_num(backup_files_list); i++)
{
pgFile *tmp_file = (pgFile *) parray_get(backup_files_list, i);
if (tmp_file->write_size == FILE_NOT_FOUND)
{
pgFileFree(tmp_file);
parray_remove(backup_files_list, i);
i--;
}
}
/* clean previous backup file list */
if (prev_backup_filelist)
{
@ -2150,6 +2137,12 @@ backup_files(void *arg)
current.backup_mode, current.parent_backup, true);
}
/* No point in storing empty, missing or not changed files */
if (file->write_size <= 0)
unlink(to_fullpath);
// elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath,
// strerror(errno));
if (file->write_size == FILE_NOT_FOUND)
continue;
@ -2192,11 +2185,8 @@ parse_filelist_filenames(parray *files, const char *root)
while (i < parray_num(files))
{
pgFile *file = (pgFile *) parray_get(files, i);
// char *relative;
int sscanf_result;
// relative = GetRelativePath(file->rel_path, root);
if (S_ISREG(file->mode) &&
path_is_prefix_of_path(PG_TBLSPC_DIR, file->rel_path))
{

View File

@ -1856,6 +1856,10 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root,
char line[BLCKSZ];
pgFile *file = (pgFile *) parray_get(files, i);
/* Ignore disappeared file */
if (file->write_size == FILE_NOT_FOUND)
continue;
if (S_ISDIR(file->mode))
{
backup_size_on_disk += 4096;

View File

@ -199,44 +199,44 @@ void
get_header_errormsg(Page page, char **errormsg)
{
PageHeader phdr = (PageHeader) page;
*errormsg = pgut_malloc(MAXPGPATH);
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
if (PageGetPageSize(phdr) != BLCKSZ)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"page size %lu is not equal to block size %u",
PageGetPageSize(phdr), BLCKSZ);
else if (phdr->pd_lower < SizeOfPageHeaderData)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_lower %i is less than page header size %lu",
phdr->pd_lower, SizeOfPageHeaderData);
else if (phdr->pd_lower > phdr->pd_upper)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_lower %u is greater than pd_upper %u",
phdr->pd_lower, phdr->pd_upper);
else if (phdr->pd_upper > phdr->pd_special)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_upper %u is greater than pd_special %u",
phdr->pd_upper, phdr->pd_special);
else if (phdr->pd_special > BLCKSZ)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_special %u is greater than block size %u",
phdr->pd_special, BLCKSZ);
else if (phdr->pd_special != MAXALIGN(phdr->pd_special))
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_special %i is misaligned, expected %lu",
phdr->pd_special, MAXALIGN(phdr->pd_special));
else if (phdr->pd_flags & ~PD_VALID_FLAG_BITS)
snprintf(*errormsg, MAXPGPATH, "page header invalid, "
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid, "
"pd_flags mask contain illegal bits");
else
snprintf(*errormsg, MAXPGPATH, "page header invalid");
snprintf(*errormsg, ERRMSG_MAX_LEN, "page header invalid");
}
/* We know that checksumms are mismatched, store specific
@ -246,9 +246,9 @@ void
get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
{
PageHeader phdr = (PageHeader) page;
*errormsg = pgut_malloc(MAXPGPATH);
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
snprintf(*errormsg, MAXPGPATH,
snprintf(*errormsg, ERRMSG_MAX_LEN,
"page verification failed, "
"calculated checksum %u but expected %u",
phdr->pd_checksum,
@ -555,8 +555,8 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema, bool missing_ok)
{
FILE *in;
FILE *out;
FILE *in = NULL;
FILE *out = NULL;
BlockNumber blknum = 0;
BlockNumber nblocks = 0; /* number of blocks in source file */
BlockNumber n_blocks_skipped = 0;
@ -571,7 +571,7 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
/* sanity */
if (file->size % BLCKSZ != 0)
elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size);
elog(WARNING, "File: '%s', invalid file size %zu", from_fullpath, file->size);
/*
* Compute expected number of blocks in the file.
@ -607,42 +607,15 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
file->uncompressed_size = 0;
INIT_FILE_CRC32(true, file->crc);
/* open source file for read */
in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST);
if (in == NULL)
{
FIN_FILE_CRC32(true, file->crc);
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
return;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
/* In all other cases throw an error */
elog(ERROR, "Cannot open file \"%s\": %s",
from_fullpath, strerror(errno));
}
/* open backup file for write */
out = fopen(to_fullpath, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open backup file \"%s\": %s",
elog(ERROR, "Cannot open backup file '%s': %s",
to_fullpath, strerror(errno));
/* update file permission */
if (chmod(to_fullpath, FILE_PERMISSION) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
elog(ERROR, "Cannot change mode of '%s': %s", to_fullpath,
strerror(errno));
/*
@ -662,33 +635,17 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
else
use_pagemap = true;
if (!fio_is_remote_file(in))
{
/* enable stdio buffering for local input file,
* unless the pagemap is involved, which
* imply a lot of random access.
*/
if (use_pagemap)
setvbuf(in, NULL, _IONBF, BUFSIZ);
else
{
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
}
/* enable stdio buffering for output file */
out_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
/* Remote mode */
if (fio_is_remote_file(in))
if (fio_is_remote(FIO_DB_HOST))
{
char *errmsg = NULL;
BlockNumber err_blknum = 0;
/* TODO: retrying via ptrack should be implemented on the agent */
int rc = fio_send_pages(in, out, file,
int rc = fio_send_pages(out, from_fullpath, file,
/* send prev backup START_LSN */
backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
@ -699,23 +656,41 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
&err_blknum, &errmsg);
/* check for errors */
if (rc == REMOTE_ERROR)
elog(ERROR, "Cannot read block %u of \"%s\": %s",
err_blknum, from_fullpath, strerror(errno));
if (rc == FILE_MISSING)
{
elog(LOG, "File '%s' is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of '%s': %s",
err_blknum, to_fullpath, strerror(errno));
else if (rc == PAGE_CORRUPTION)
{
if (errmsg)
elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
elog(ERROR, "Corruption detected in file '%s', block %u: %s",
from_fullpath, err_blknum, errmsg);
else
elog(ERROR, "Corruption detected in file \"%s\", block %u",
elog(ERROR, "Corruption detected in file '%s', block %u",
from_fullpath, err_blknum);
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
err_blknum, to_fullpath, strerror(errno));
/* OPEN_FAILED and READ_FAILED */
else if (rc == OPEN_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Failed to open for reading remote file '%s'", from_fullpath);
}
else if (rc == READ_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Failed to read from remote file '%s'", from_fullpath);
}
file->read_size = rc * BLCKSZ;
pg_free(errmsg);
@ -724,10 +699,47 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
/* Local mode */
else
{
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File '%s' is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File '%s' is not found", from_fullpath);
}
/* In all other cases throw an error */
elog(ERROR, "Cannot open file '%s': %s",
from_fullpath, strerror(errno));
}
/* Enable stdio buffering for local input file,
* unless the pagemap is involved, which
* imply a lot of random access.
*/
if (use_pagemap)
{
iter = datapagemap_iterate(&file->pagemap);
datapagemap_next(iter, &blknum); /* set first block */
setvbuf(in, NULL, _IONBF, BUFSIZ);
}
else
{
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
while (blknum < nblocks)
@ -776,14 +788,6 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
backup_mode == BACKUP_MODE_DIFF_DELTA)
file->n_blocks = file->read_size / BLCKSZ;
if (fclose(out))
elog(ERROR, "Cannot close the backup file \"%s\": %s",
to_fullpath, strerror(errno));
fio_fclose(in);
FIN_FILE_CRC32(true, file->crc);
/* Determine that file didn`t changed in case of incremental backup */
if (backup_mode != BACKUP_MODE_FULL &&
file->exists_in_prev &&
@ -793,15 +797,19 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
file->write_size = BYTES_INVALID;
}
/*
* No point in storing empty files.
*/
if (file->write_size <= 0)
{
if (unlink(to_fullpath) == -1)
elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath,
strerror(errno));
}
cleanup:
/* finish CRC calculation */
FIN_FILE_CRC32(true, file->crc);
/* close local input file */
if (in && fclose(in))
elog(ERROR, "Cannot close the source file '%s': %s",
to_fullpath, strerror(errno));
/* close local output file */
if (out && fclose(out))
elog(ERROR, "Cannot close the backup file '%s': %s",
to_fullpath, strerror(errno));
pg_free(in_buf);
pg_free(out_buf);
@ -1249,108 +1257,131 @@ backup_non_data_file_internal(const char *from_fullpath,
const char *to_fullpath, pgFile *file,
bool missing_ok)
{
FILE *in;
FILE *out;
FILE *in = NULL;
FILE *out = NULL;
ssize_t read_len = 0;
char *buf;
pg_crc32 crc;
char *buf = NULL;
INIT_FILE_CRC32(true, crc);
INIT_FILE_CRC32(true, file->crc);
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
file->uncompressed_size = 0;
/* open source file for read */
in = fio_fopen(from_fullpath, PG_BINARY_R, from_location);
if (in == NULL)
{
FIN_FILE_CRC32(true, crc);
file->crc = crc;
/* maybe deleted, it's not error in case of backup */
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
return;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
elog(ERROR, "Cannot open source file \"%s\": %s", from_fullpath,
strerror(errno));
}
/* open backup file for write */
out = fopen(to_fullpath, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open destination file \"%s\": %s",
elog(ERROR, "Cannot open destination file '%s': %s",
to_fullpath, strerror(errno));
/* update file permission */
if (chmod(to_fullpath, file->mode) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
elog(ERROR, "Cannot change mode of '%s': %s", to_fullpath,
strerror(errno));
/* disable stdio buffering for local input/output files */
if (!fio_is_remote_file(in))
setvbuf(in, NULL, _IONBF, BUFSIZ);
setvbuf(out, NULL, _IONBF, BUFSIZ);
/* allocate 64kB buffer */
buf = pgut_malloc(STDIO_BUFSIZE);
/* copy content and calc CRC */
for (;;)
/* backup remote file */
if (fio_is_remote(FIO_DB_HOST))
{
read_len = fio_fread(in, buf, STDIO_BUFSIZE);
char *errmsg = NULL;
int rc = fio_send_file(from_fullpath, to_fullpath, out, file, &errmsg);
if (read_len < 0)
elog(ERROR, "Cannot read from source file \"%s\": %s",
from_fullpath, strerror(errno));
/* handle errors */
if (rc == FILE_MISSING)
{
/* maybe deleted, it's not error in case of backup */
if (missing_ok)
{
elog(LOG, "File '%s' is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File '%s' is not found", from_fullpath);
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write to '%s': %s", to_fullpath, strerror(errno));
else if (rc != SEND_OK)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot access remote file '%s'", from_fullpath);
}
if (read_len == 0)
break;
pg_free(errmsg);
}
/* backup local file */
else
{
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/* maybe deleted, it's not error in case of backup */
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File '%s' is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File '%s' is not found", from_fullpath);
}
if (fwrite(buf, 1, read_len, out) != read_len)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
elog(ERROR, "Cannot open file '%s': %s", from_fullpath,
strerror(errno));
}
/* update CRC */
COMP_FILE_CRC32(true, crc, buf, read_len);
/* disable stdio buffering for local input/output files to avoid triple buffering */
setvbuf(in, NULL, _IONBF, BUFSIZ);
setvbuf(out, NULL, _IONBF, BUFSIZ);
file->read_size += read_len;
/* allocate 64kB buffer */
buf = pgut_malloc(CHUNK_SIZE);
// if (read_len < STDIO_BUFSIZE)
// {
// if (!fio_is_remote_file(in))
// {
// if (ferror(in))
// elog(ERROR, "Cannot read from source file \"%s\": %s",
// from_fullpath, strerror(errno));
//
// if (feof(in))
// break;
// }
// }
/* copy content and calc CRC */
for (;;)
{
read_len = fread(buf, 1, CHUNK_SIZE, in);
if (ferror(in))
elog(ERROR, "Cannot read from file '%s': %s",
from_fullpath, strerror(errno));
if (read_len > 0)
{
if (fwrite(buf, 1, read_len, out) != read_len)
elog(ERROR, "Cannot write to file '%s': %s", to_fullpath,
strerror(errno));
/* update CRC */
COMP_FILE_CRC32(true, file->crc, buf, read_len);
file->read_size += read_len;
}
if (feof(in))
break;
}
}
file->write_size = (int64) file->read_size;
if (file->write_size > 0)
file->uncompressed_size = file->write_size;
/* finish CRC calculation and store into pgFile */
FIN_FILE_CRC32(true, crc);
file->crc = crc;
if (fclose(out))
elog(ERROR, "Cannot write \"%s\": %s", to_fullpath, strerror(errno));
fio_fclose(in);
cleanup:
/* finish CRC calculation and store into pgFile */
FIN_FILE_CRC32(true, file->crc);
if (in && fclose(in))
elog(ERROR, "Cannot close the file '%s': %s", from_fullpath, strerror(errno));
if (out && fclose(out))
elog(ERROR, "Cannot close the file '%s': %s", to_fullpath, strerror(errno));
pg_free(buf);
}

View File

@ -87,6 +87,10 @@ extern const char *PROGRAM_EMAIL;
/* stdio buffer size */
#define STDIO_BUFSIZE 65536
#define ERRMSG_MAX_LEN 2048
#define CHUNK_SIZE (128 * 1024)
#define OUT_BUF_SIZE (512 * 1024)
/* retry attempts */
#define PAGE_READ_ATTEMPTS 100
@ -168,9 +172,9 @@ typedef struct pgFile
*/
/* we need int64 here to store '-1' value */
pg_crc32 crc; /* CRC value of the file, regular file only */
char *rel_path; /* relative path of the file */
char *linked; /* path of the linked file */
bool is_datafile; /* true if the file is PostgreSQL data file */
char *rel_path; /* relative path of the file */
Oid tblspcOid; /* tblspcOid extracted from path, if applicable */
Oid dbOid; /* dbOid extracted from path, if applicable */
Oid relOid; /* relOid extracted from path, if applicable */
@ -178,7 +182,7 @@ typedef struct pgFile
int segno; /* Segment number for ptrack */
BlockNumber n_blocks; /* size of the data file in blocks */
bool is_cfs; /* Flag to distinguish files compressed by CFS*/
bool is_database;
bool is_database; /* Flag used strictly by ptrack 1.x backup */
int external_dir_num; /* Number of external directory. 0 if not external */
bool exists_in_prev; /* Mark files, both data and regular, that exists in previous backup */
CompressAlg compress_alg; /* compression algorithm applied to the file */
@ -1020,13 +1024,14 @@ extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo)
extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema, XLogRecPtr lsn);
/* FIO */
extern int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
extern int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn,
int calg, int clevel, uint32 checksum_version,
datapagemap_t *pagemap, BlockNumber* err_blknum, char **errormsg);
/* return codes for fio_send_pages */
#define OUT_BUF_SIZE (512 * 1024)
extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, int thread_num);
extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, int thread_num);
extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out,
pgFile *file, char **errormsg);
extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
pgFile *file, char **errormsg);
/* return codes for fio_send_pages() and fio_send_file() */
#define SEND_OK (0)

View File

@ -14,7 +14,6 @@
#define PRINTF_BUF_SIZE 1024
#define FILE_PERMISSIONS 0600
#define CHUNK_SIZE 1024 * 128
static __thread unsigned long fio_fdset = 0;
static __thread void* fio_stdin_buffer;
@ -33,6 +32,7 @@ typedef struct
int calg;
int clevel;
int bitmapsize;
int path_len;
} fio_send_request;
@ -1316,9 +1316,11 @@ static void fio_load_file(int out, char const* path)
* Return number of actually(!) readed blocks, attempts or
* half-readed block are not counted.
* Return values in case of error:
* REMOTE_ERROR
* PAGE_CORRUPTION
* WRITE_FAILED
* FILE_MISSING
* OPEN_FAILED
* READ_ERROR
* PAGE_CORRUPTION
* WRITE_FAILED
*
* If none of the above, this function return number of blocks
* readed by remote agent.
@ -1326,7 +1328,7 @@ static void fio_load_file(int out, char const* path)
* In case of DELTA mode horizonLsn must be a valid lsn,
* otherwise it should be set to InvalidXLogRecPtr.
*/
int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn,
int calg, int clevel, uint32 checksum_version,
datapagemap_t *pagemap, BlockNumber* err_blknum,
char **errormsg)
@ -1338,22 +1340,19 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
BlockNumber n_blocks_read = 0;
BlockNumber blknum = 0;
Assert(fio_is_remote_file(in));
/* send message with header
8bytes 20bytes var
------------------------------------------------------
| fio_header | fio_send_request | BITMAP(if any) |
------------------------------------------------------
8bytes 24bytes var var
--------------------------------------------------------------
| fio_header | fio_send_request | FILE PATH | BITMAP(if any) |
--------------------------------------------------------------
*/
req.hdr.handle = fio_fileno(in) & ~FIO_PIPE_MARKER;
req.hdr.cop = FIO_SEND_PAGES;
if (pagemap)
{
req.hdr.cop = FIO_SEND_PAGES_PAGEMAP;
req.hdr.size = sizeof(fio_send_request) + pagemap->bitmapsize;
req.hdr.size = sizeof(fio_send_request) + pagemap->bitmapsize + strlen(from_fullpath) + 1;
req.arg.bitmapsize = pagemap->bitmapsize;
/* TODO: add optimization for the case of pagemap
@ -1363,8 +1362,8 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
}
else
{
req.hdr.cop = FIO_SEND_PAGES;
req.hdr.size = sizeof(fio_send_request);
req.hdr.size = sizeof(fio_send_request) + strlen(from_fullpath) + 1;
req.arg.bitmapsize = 0;
}
req.arg.nblocks = file->size/BLCKSZ;
@ -1373,6 +1372,7 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
req.arg.checksumVersion = checksum_version;
req.arg.calg = calg;
req.arg.clevel = clevel;
req.arg.path_len = strlen(from_fullpath) + 1;
file->compress_alg = calg; /* TODO: wtf? why here? */
@ -1385,10 +1385,14 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
// pg_free(iter);
//<-----
/* send header */
IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req));
/* send file path */
IO_CHECK(fio_write_all(fio_stdout, from_fullpath, req.arg.path_len), req.arg.path_len);
/* send pagemap if any */
if (pagemap)
/* now send pagemap itself */
IO_CHECK(fio_write_all(fio_stdout, pagemap->bitmap, pagemap->bitmapsize), pagemap->bitmapsize);
while (true)
@ -1402,9 +1406,15 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
if (hdr.cop == FIO_ERROR)
{
errno = hdr.arg;
*err_blknum = hdr.size;
return REMOTE_ERROR;
/* FILE_MISSING, OPEN_FAILED and READ_FAILED */
if (hdr.size > 0)
{
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
*errormsg = pgut_malloc(hdr.size);
snprintf(*errormsg, hdr.size, "%s", buf);
}
return hdr.arg;
}
else if (hdr.cop == FIO_SEND_FILE_CORRUPTION)
{
@ -1414,7 +1424,7 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
{
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
*errormsg = pgut_malloc(hdr.size);
strncpy(*errormsg, buf, hdr.size);
snprintf(*errormsg, hdr.size, "%s", buf);
}
return PAGE_CORRUPTION;
}
@ -1449,66 +1459,134 @@ int fio_send_pages(FILE* in, FILE* out, pgFile *file, XLogRecPtr horizonLsn,
return n_blocks_read;
}
/* TODO: read file using large buffer */
static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
{
BlockNumber blknum = 0;
BlockNumber n_blocks_read = 0;
XLogRecPtr page_lsn = 0;
char read_buffer[BLCKSZ+1];
fio_header hdr;
fio_send_request *req = (fio_send_request*) buf;
/* TODO: read file using large buffer
* Return codes:
* FIO_ERROR:
* FILE_MISSING (-1)
* OPEN_FAILED (-2)
* READ_FAILED (-3)
* FIO_SEND_FILE_CORRUPTION
* FIO_SEND_FILE_EOF
*/
static void fio_send_pages_impl(int out, char* buf)
{
FILE *in = NULL;
BlockNumber blknum = 0;
BlockNumber n_blocks_read = 0;
XLogRecPtr page_lsn = 0;
char read_buffer[BLCKSZ+1];
char in_buf[STDIO_BUFSIZE];
fio_header hdr;
fio_send_request *req = (fio_send_request*) buf;
char *from_fullpath = (char*) buf + sizeof(fio_send_request);
int current_pos = 0;
bool with_pagemap = req->bitmapsize > 0 ? true : false;
/* error reporting */
char *errormsg = NULL;
/* parse buffer */
datapagemap_t *map = NULL;
datapagemap_iterator_t *iter = NULL;
/* open source file */
in = fopen(from_fullpath, PG_BINARY_R);
if (!in)
{
hdr.cop = FIO_ERROR;
/* do not send exact wording of ENOENT error message
* because it is a very common error in our case, so
* error code is enough.
*/
if (errno == ENOENT)
{
hdr.arg = FILE_MISSING;
hdr.size = 0;
}
else
{
hdr.arg = OPEN_FAILED;
errormsg = pgut_malloc(ERRMSG_MAX_LEN);
/* Construct the error message */
snprintf(errormsg, ERRMSG_MAX_LEN, "Cannot open source file '%s': %s",
from_fullpath, strerror(errno));
hdr.size = strlen(errormsg) + 1;
}
/* send header and message */
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (errormsg)
IO_CHECK(fio_write_all(out, errormsg, hdr.size), hdr.size);
goto cleanup;
}
if (with_pagemap)
{
map = pgut_malloc(sizeof(datapagemap_t));
map->bitmapsize = req->bitmapsize;
map->bitmap = (char*) buf + sizeof(fio_send_request);
map->bitmap = (char*) buf + sizeof(fio_send_request) + req->path_len;
/* get first block */
iter = datapagemap_iterate(map);
datapagemap_next(iter, &blknum);
}
hdr.cop = FIO_PAGE;
setvbuf(in, NULL, _IONBF, BUFSIZ);
}
else
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
/* TODO: what is this barrier for? */
read_buffer[BLCKSZ] = 1; /* barrier */
while (blknum < req->nblocks)
{
int rc = 0;
int retry_attempts = PAGE_READ_ATTEMPTS;
int rc = 0;
size_t read_len = 0;
int retry_attempts = PAGE_READ_ATTEMPTS;
/* TODO: handle signals on the agent */
if (interrupted)
elog(ERROR, "Interrupted during remote page reading");
/* read page, check header and validate checksumms */
/* TODO: libpq connection on the agent, so we can do ptrack
* magic right here.
*/
for (;;)
{
ssize_t read_len = pread(fd, read_buffer, BLCKSZ, blknum*BLCKSZ);
/* Optimize stdio buffer usage, fseek only when current position
* does not match the position of requested block.
*/
if (current_pos != blknum*BLCKSZ)
{
current_pos = blknum*BLCKSZ;
if (fseek(in, current_pos, SEEK_SET) != 0)
elog(ERROR, "fseek to position %u is failed on remote file '%s': %s",
current_pos, from_fullpath, strerror(errno));
}
read_len = fread(read_buffer, 1, BLCKSZ, in);
page_lsn = InvalidXLogRecPtr;
/* report eof */
if (read_len == 0)
goto eof;
current_pos += BLCKSZ;
/* report error */
else if (read_len < 0)
if (ferror(in))
{
/* TODO: better to report exact error message, not errno */
hdr.cop = FIO_ERROR;
hdr.arg = errno;
hdr.size = blknum;
hdr.arg = READ_FAILED;
errormsg = pgut_malloc(ERRMSG_MAX_LEN);
/* Construct the error message */
snprintf(errormsg, ERRMSG_MAX_LEN, "Cannot read block %u of '%s': %s",
blknum, from_fullpath, strerror(errno));
hdr.size = strlen(errormsg) + 1;
/* send header and message */
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, errormsg, hdr.size), hdr.size);
goto cleanup;
}
else if (read_len == BLCKSZ)
if (read_len == BLCKSZ)
{
rc = validate_one_page(read_buffer, req->segmentno + blknum,
InvalidXLogRecPtr, &page_lsn, req->checksumVersion);
@ -1519,6 +1597,9 @@ static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
else if (rc == PAGE_IS_VALID)
break;
}
if (feof(in))
goto eof;
// else /* readed less than BLKSZ bytes, retry */
/* File is either has insane header or invalid checksum,
@ -1526,7 +1607,6 @@ static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
*/
if (--retry_attempts == 0)
{
char *errormsg = NULL;
hdr.cop = FIO_SEND_FILE_CORRUPTION;
hdr.arg = blknum;
@ -1547,7 +1627,6 @@ static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
if (errormsg)
IO_CHECK(fio_write_all(out, errormsg, hdr.size), hdr.size);
pg_free(errormsg);
goto cleanup;
}
}
@ -1567,6 +1646,7 @@ static void fio_send_pages_impl(int fd, int out, char* buf, bool with_pagemap)
BackupPageHeader* bph = (BackupPageHeader*)write_buffer;
/* compress page */
hdr.cop = FIO_PAGE;
hdr.arg = bph->block = blknum;
hdr.size = sizeof(BackupPageHeader);
@ -1608,6 +1688,9 @@ eof:
cleanup:
pg_free(map);
pg_free(iter);
pg_free(errormsg);
if (in)
fclose(in);
return;
}
@ -1621,7 +1704,8 @@ cleanup:
* ZLIB_ERROR (-5)
* REMOTE_ERROR (-6)
*/
int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, int thread_num)
int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out,
pgFile *file, char **errormsg)
{
fio_header hdr;
int exit_code = SEND_OK;
@ -1634,8 +1718,8 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
hdr.cop = FIO_SEND_FILE;
hdr.size = path_len;
elog(VERBOSE, "Thread [%d]: Attempting to open remote compressed WAL file '%s'",
thread_num, from_fullpath);
// elog(VERBOSE, "Thread [%d]: Attempting to open remote compressed WAL file '%s'",
// thread_num, from_fullpath);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, from_fullpath, path_len), path_len);
@ -1655,7 +1739,8 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
if (hdr.size > 0)
{
IO_CHECK(fio_read_all(fio_stdin, in_buf, hdr.size), hdr.size);
elog(WARNING, "Thread [%d]: %s", thread_num, in_buf);
*errormsg = pgut_malloc(hdr.size);
snprintf(*errormsg, hdr.size, "%s", in_buf);
}
exit_code = hdr.arg;
goto cleanup;
@ -1681,8 +1766,10 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
if (rc != Z_OK)
{
elog(WARNING, "Thread [%d]: Failed to initialize decompression stream for file '%s': %i: %s",
thread_num, from_fullpath, rc, strm->msg);
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
snprintf(*errormsg, ERRMSG_MAX_LEN,
"Failed to initialize decompression stream for file '%s': %i: %s",
from_fullpath, rc, strm->msg);
exit_code = ZLIB_ERROR;
goto cleanup;
}
@ -1714,8 +1801,10 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
else if (rc != Z_OK)
{
/* got an error */
elog(WARNING, "Thread [%d]: Decompression failed for file '%s': %i: %s",
thread_num, from_fullpath, rc, strm->msg);
*errormsg = pgut_malloc(ERRMSG_MAX_LEN);
snprintf(*errormsg, ERRMSG_MAX_LEN,
"Decompression failed for file '%s': %i: %s",
from_fullpath, rc, strm->msg);
exit_code = ZLIB_ERROR;
goto cleanup;
}
@ -1725,8 +1814,6 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
/* Output buffer is full, write it out */
if (fwrite(out_buf, 1, OUT_BUF_SIZE, out) != OUT_BUF_SIZE)
{
elog(WARNING, "Thread [%d]: Cannot write to file '%s': %s",
thread_num, to_fullpath, strerror(errno));
exit_code = WRITE_FAILED;
goto cleanup;
}
@ -1743,20 +1830,13 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
if (fwrite(out_buf, 1, len, out) != len)
{
elog(WARNING, "Thread [%d]: Cannot write to file: %s",
thread_num, strerror(errno));
exit_code = WRITE_FAILED;
goto cleanup;
}
}
}
else
{
elog(WARNING, "Thread [%d]: Remote agent returned message of unexpected type: %i",
thread_num, hdr.cop);
exit_code = REMOTE_ERROR;
break;
}
elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop);
}
cleanup:
@ -1779,10 +1859,14 @@ cleanup:
* SEND_OK (0)
* FILE_MISSING (-1)
* OPEN_FAILED (-2)
* READ_FAIL (-3)
* WRITE_FAIL (-4)
* READ_FAILED (-3)
* WRITE_FAILED (-4)
*
* OPEN_FAILED and READ_FAIL should also set errormsg.
* If pgFile is not NULL then we must calculate crc and read_size for it.
*/
int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, int thread_num)
int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
pgFile *file, char **errormsg)
{
fio_header hdr;
int exit_code = SEND_OK;
@ -1792,8 +1876,8 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
hdr.cop = FIO_SEND_FILE;
hdr.size = path_len;
elog(VERBOSE, "Thread [%d]: Attempting to open remote WAL file '%s'",
thread_num, from_fullpath);
// elog(VERBOSE, "Thread [%d]: Attempting to open remote WAL file '%s'",
// thread_num, from_fullpath);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, from_fullpath, path_len), path_len);
@ -1813,7 +1897,8 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
if (hdr.size > 0)
{
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
elog(WARNING, "Thread [%d]: %s", thread_num, buf);
*errormsg = pgut_malloc(hdr.size);
snprintf(*errormsg, hdr.size, "%s", buf);
}
exit_code = hdr.arg;
break;
@ -1826,19 +1911,20 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
/* We have received a chunk of data data, lets write it out */
if (fwrite(buf, 1, hdr.size, out) != hdr.size)
{
elog(WARNING, "Thread [%d]: Cannot write to file '%s': %s",
thread_num, to_fullpath, strerror(errno));
exit_code = WRITE_FAILED;
break;
}
if (file)
{
file->read_size += hdr.size;
COMP_FILE_CRC32(true, file->crc, buf, hdr.size);
}
}
else
{
/* TODO: fio_disconnect may get assert fail when running after this */
elog(WARNING, "Thread [%d]: Remote agent returned message of unexpected type: %i",
thread_num, hdr.cop);
exit_code = REMOTE_ERROR;
break;
elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop);
}
}
@ -1851,9 +1937,13 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
/* Send file content
* On error we return FIO_ERROR message with following codes
* FILE_MISSING (-1)
* OPEN_FAILED (-2)
* READ_FAILED (-3)
* FIO_ERROR:
* FILE_MISSING (-1)
* OPEN_FAILED (-2)
* READ_FAILED (-3)
*
* FIO_PAGE
* FIO_SEND_FILE_EOF
*
*/
static void fio_send_file_impl(int out, char const* path)
@ -1883,9 +1973,9 @@ static void fio_send_file_impl(int out, char const* path)
else
{
hdr.arg = OPEN_FAILED;
errormsg = pgut_malloc(MAXPGPATH);
errormsg = pgut_malloc(ERRMSG_MAX_LEN);
/* Construct the error message */
snprintf(errormsg, MAXPGPATH, "Cannot open source file '%s': %s", path, strerror(errno));
snprintf(errormsg, ERRMSG_MAX_LEN, "Cannot open file '%s': %s", path, strerror(errno));
hdr.size = strlen(errormsg) + 1;
}
@ -1909,10 +1999,10 @@ static void fio_send_file_impl(int out, char const* path)
if (ferror(fp))
{
hdr.cop = FIO_ERROR;
errormsg = pgut_malloc(MAXPGPATH);
errormsg = pgut_malloc(ERRMSG_MAX_LEN);
hdr.arg = READ_FAILED;
/* Construct the error message */
snprintf(errormsg, MAXPGPATH, "Cannot read source file '%s': %s", path, strerror(errno));
snprintf(errormsg, ERRMSG_MAX_LEN, "Cannot read from file '%s': %s", path, strerror(errno));
hdr.size = strlen(errormsg) + 1;
/* send header and message */
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
@ -2084,12 +2174,8 @@ void fio_communicate(int in, int out)
SYS_CHECK(ftruncate(fd[hdr.handle], hdr.arg));
break;
case FIO_SEND_PAGES:
Assert(hdr.size == sizeof(fio_send_request));
fio_send_pages_impl(fd[hdr.handle], out, buf, false);
break;
case FIO_SEND_PAGES_PAGEMAP:
// buf contain fio_send_request header and bitmap.
fio_send_pages_impl(fd[hdr.handle], out, buf, true);
fio_send_pages_impl(out, buf);
break;
case FIO_SEND_FILE:
fio_send_file_impl(out, buf);

View File

@ -37,7 +37,6 @@ typedef enum
FIO_GET_CRC32,
/* used in fio_send_pages */
FIO_SEND_PAGES,
FIO_SEND_PAGES_PAGEMAP,
FIO_ERROR,
FIO_SEND_FILE,
// FIO_CHUNK,