1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-12 11:45:24 +02:00

Remote backup works

This commit is contained in:
Konstantin Knizhnik 2018-11-04 23:20:07 +03:00
parent be4c4be4c2
commit be6a4e9bcb
14 changed files with 182 additions and 100 deletions

2
.gitignore vendored
View File

@ -38,7 +38,7 @@
/src/datapagemap.h /src/datapagemap.h
/src/logging.h /src/logging.h
/src/receivelog.c /src/receivelog.c
/src/receivelog.h /src/receivelog.hщщ
/src/streamutil.c /src/streamutil.c
/src/streamutil.h /src/streamutil.h
/src/xlogreader.c /src/xlogreader.c

View File

@ -943,6 +943,16 @@ do_backup(time_t start_time)
//elog(LOG, "Backup completed. Total bytes : " INT64_FORMAT "", //elog(LOG, "Backup completed. Total bytes : " INT64_FORMAT "",
// current.data_bytes); // current.data_bytes);
if (is_remote_agent)
fio_transfer(&current.start_time,current.start_time);
else
complete_backup();
return 0;
}
void complete_backup(void)
{
pgBackupValidate(&current); pgBackupValidate(&current);
elog(INFO, "Backup %s completed", base36enc(current.start_time)); elog(INFO, "Backup %s completed", base36enc(current.start_time));
@ -953,8 +963,6 @@ do_backup(time_t start_time)
*/ */
if (delete_expired || delete_wal) if (delete_expired || delete_wal)
do_retention_purge(); do_retention_purge();
return 0;
} }
/* /*
@ -1529,13 +1537,13 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
{ {
if (!file_exists) if (!file_exists)
{ {
file_exists = fileExists(wal_segment_path); file_exists = fileExists(wal_segment_path, is_start_lsn ? FIO_DB_HOST : FIO_BACKUP_HOST);
/* Try to find compressed WAL file */ /* Try to find compressed WAL file */
if (!file_exists) if (!file_exists)
{ {
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
file_exists = fileExists(gz_wal_segment_path); file_exists = fileExists(gz_wal_segment_path, is_start_lsn ? FIO_DB_HOST : FIO_BACKUP_HOST);
if (file_exists) if (file_exists)
elog(LOG, "Found compressed WAL segment: %s", wal_segment_path); elog(LOG, "Found compressed WAL segment: %s", wal_segment_path);
#endif #endif
@ -1822,7 +1830,6 @@ pg_stop_backup(pgBackup *backup)
PQerrorMessage(conn), stop_backup_query); PQerrorMessage(conn), stop_backup_query);
} }
elog(INFO, "pg_stop backup() successfully executed"); elog(INFO, "pg_stop backup() successfully executed");
sleep(20);
} }
backup_in_progress = false; backup_in_progress = false;
@ -2155,7 +2162,7 @@ backup_files(void *arg)
skip = true; /* ...skip copying file. */ skip = true; /* ...skip copying file. */
} }
if (skip || if (skip ||
!copy_file(arguments->from_root, arguments->to_root, file)) !copy_file(arguments->from_root, arguments->to_root, file, FIO_BACKUP_HOST))
{ {
file->write_size = BYTES_INVALID; file->write_size = BYTES_INVALID;
elog(VERBOSE, "File \"%s\" was not copied to backup", elog(VERBOSE, "File \"%s\" was not copied to backup",

View File

@ -769,13 +769,8 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
*/ */
if (allow_truncate && file->n_blocks != BLOCKNUM_INVALID && !need_truncate) if (allow_truncate && file->n_blocks != BLOCKNUM_INVALID && !need_truncate)
{ {
size_t file_size = 0; struct stat st;
if (fio_ffstat(out, &st) == 0 && st.st_size > file->n_blocks * BLCKSZ)
/* get file current size */
fio_fseek(out, 0);
file_size = ftell(out);
if (file_size > file->n_blocks * BLCKSZ)
{ {
truncate_from = file->n_blocks; truncate_from = file->n_blocks;
need_truncate = true; need_truncate = true;
@ -824,7 +819,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
* it is either small control file or already compressed cfs file. * it is either small control file or already compressed cfs file.
*/ */
bool bool
copy_file(const char *from_root, const char *to_root, pgFile *file) copy_file(const char *from_root, const char *to_root, pgFile *file, fio_location location)
{ {
char to_path[MAXPGPATH]; char to_path[MAXPGPATH];
FILE *in; FILE *in;
@ -858,7 +853,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
/* open backup file for write */ /* open backup file for write */
join_path_components(to_path, to_root, file->path + strlen(from_root) + 1); join_path_components(to_path, to_root, file->path + strlen(from_root) + 1);
out = fio_fopen(to_path, PG_BINARY_W, FIO_BACKUP_HOST); out = fio_fopen(to_path, PG_BINARY_W, location);
if (out == NULL) if (out == NULL)
{ {
int errno_tmp = errno; int errno_tmp = errno;
@ -932,7 +927,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
file->crc = crc; file->crc = crc;
/* update file permission */ /* update file permission */
if (fio_chmod(to_path, st.st_mode, FIO_BACKUP_HOST) == -1) if (fio_chmod(to_path, st.st_mode, location) == -1)
{ {
errno_tmp = errno; errno_tmp = errno;
fclose(in); fclose(in);
@ -1039,7 +1034,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
{ {
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
if (!overwrite && fileExists(gz_to_path)) if (!overwrite && fileExists(gz_to_path, FIO_BACKUP_HOST))
elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path); elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path);
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
@ -1054,7 +1049,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
else else
#endif #endif
{ {
if (!overwrite && fileExists(to_path)) if (!overwrite && fileExists(to_path, FIO_BACKUP_HOST))
elog(ERROR, "WAL segment \"%s\" already exists.", to_path); elog(ERROR, "WAL segment \"%s\" already exists.", to_path);
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
@ -1335,9 +1330,8 @@ get_wal_file(const char *from_path, const char *to_path)
bool bool
calc_file_checksum(pgFile *file) calc_file_checksum(pgFile *file)
{ {
FILE *in; int in;
size_t read_len = 0; size_t read_len = 0;
int errno_tmp;
char buf[BLCKSZ]; char buf[BLCKSZ];
struct stat st; struct stat st;
pg_crc32 crc; pg_crc32 crc;
@ -1350,8 +1344,8 @@ calc_file_checksum(pgFile *file)
file->write_size = 0; file->write_size = 0;
/* open backup mode file for read */ /* open backup mode file for read */
in = fopen(file->path, PG_BINARY_R); in = fio_open(file->path, O_RDONLY|PG_BINARY, FIO_BACKUP_HOST);
if (in == NULL) if (in < 0)
{ {
FIN_TRADITIONAL_CRC32(crc); FIN_TRADITIONAL_CRC32(crc);
file->crc = crc; file->crc = crc;
@ -1365,16 +1359,16 @@ calc_file_checksum(pgFile *file)
} }
/* stat source file to change mode of destination file */ /* stat source file to change mode of destination file */
if (fstat(fileno(in), &st) == -1) if (fio_fstat(in, &st) == -1)
{ {
fclose(in); fio_close(in);
elog(ERROR, "cannot stat \"%s\": %s", file->path, elog(ERROR, "cannot stat \"%s\": %s", file->path,
strerror(errno)); strerror(errno));
} }
for (;;) for (;;)
{ {
read_len = fread(buf, 1, sizeof(buf), in); read_len = fio_read(in, buf, sizeof(buf));
if(read_len == 0) if(read_len == 0)
break; break;
@ -1386,19 +1380,11 @@ calc_file_checksum(pgFile *file)
file->read_size += read_len; file->read_size += read_len;
} }
errno_tmp = errno;
if (!feof(in))
{
fclose(in);
elog(ERROR, "cannot read backup mode file \"%s\": %s",
file->path, strerror(errno_tmp));
}
/* finish CRC calculation and store into pgFile */ /* finish CRC calculation and store into pgFile */
FIN_TRADITIONAL_CRC32(crc); FIN_TRADITIONAL_CRC32(crc);
file->crc = crc; file->crc = crc;
fclose(in); fio_close(in);
return true; return true;
} }

View File

@ -162,8 +162,8 @@ pgFileNew(const char *path, bool omit_symlink)
struct stat st; struct stat st;
pgFile *file; pgFile *file;
/* stat the file */ /* stat the file */
if ((omit_symlink ? stat(path, &st) : lstat(path, &st)) == -1) if (fio_stat(path, &st, omit_symlink, FIO_BACKUP_HOST) < 0)
{ {
/* file not found is not an error case */ /* file not found is not an error case */
if (errno == ENOENT) if (errno == ENOENT)
@ -403,7 +403,7 @@ dir_list_file(parray *files, const char *root, bool exclude, bool omit_symlink,
join_path_components(path, backup_instance_path, PG_BLACK_LIST); join_path_components(path, backup_instance_path, PG_BLACK_LIST);
/* List files with black list */ /* List files with black list */
if (root && pgdata && strcmp(root, pgdata) == 0 && fileExists(path)) if (root && pgdata && strcmp(root, pgdata) == 0 && fileExists(path, FIO_LOCAL_HOST))
{ {
FILE *black_list_file = NULL; FILE *black_list_file = NULL;
char buf[MAXPGPATH * 2]; char buf[MAXPGPATH * 2];
@ -935,7 +935,7 @@ create_data_directories(const char *data_dir, const char *backup_dir,
size_t i; size_t i;
char backup_database_dir[MAXPGPATH], char backup_database_dir[MAXPGPATH],
to_path[MAXPGPATH]; to_path[MAXPGPATH];
sleep(30);
dirs = parray_new(); dirs = parray_new();
if (extract_tablespaces) if (extract_tablespaces)
{ {
@ -1088,7 +1088,7 @@ read_tablespace_map(parray *files, const char *backup_dir)
join_path_components(map_path, db_path, PG_TABLESPACE_MAP_FILE); join_path_components(map_path, db_path, PG_TABLESPACE_MAP_FILE);
/* Exit if database/tablespace_map doesn't exist */ /* Exit if database/tablespace_map doesn't exist */
if (!fileExists(map_path)) if (!fileExists(map_path, FIO_LOCAL_HOST))
{ {
elog(LOG, "there is no file tablespace_map"); elog(LOG, "there is no file tablespace_map");
return; return;
@ -1490,11 +1490,11 @@ dir_is_empty(const char *path)
* Return true if the path is a existing regular file. * Return true if the path is a existing regular file.
*/ */
bool bool
fileExists(const char *path) fileExists(const char *path, fio_location location)
{ {
struct stat buf; struct stat buf;
if (stat(path, &buf) == -1 && errno == ENOENT) if (fio_stat(path, &buf, true, location) == -1 && errno == ENOENT)
return false; return false;
else if (!S_ISREG(buf.st_mode)) else if (!S_ISREG(buf.st_mode))
return false; return false;

View File

@ -43,7 +43,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe)
fullpath, strerror(errno)); fullpath, strerror(errno));
} }
if (fio_stat(fd, &statbuf) < 0) if (fio_fstat(fd, &statbuf) < 0)
{ {
if (safe) if (safe)
return NULL; return NULL;

View File

@ -448,7 +448,7 @@ merge_files(void *arg)
/* /*
* We need to decompress target file only if it exists. * We need to decompress target file only if it exists.
*/ */
if (fileExists(to_path_tmp)) if (fileExists(to_path_tmp, FIO_LOCAL_HOST))
{ {
/* /*
* file->path points to the file in from_root directory. But we * file->path points to the file in from_root directory. But we

View File

@ -86,22 +86,22 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime
typedef struct XLogPageReadPrivate typedef struct XLogPageReadPrivate
{ {
int thread_num; int thread_num;
const char *archivedir; const char *archivedir;
TimeLineID tli; TimeLineID tli;
uint32 xlog_seg_size; uint32 xlog_seg_size;
bool manual_switch; bool manual_switch;
bool need_switch; bool need_switch;
int xlogfile;
XLogSegNo xlogsegno;
char xlogpath[MAXPGPATH];
bool xlogexists;
int xlogfile;
XLogSegNo xlogsegno;
char xlogpath[MAXPGPATH];
bool xlogexists;
fio_location location;
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
gzFile gz_xlogfile; gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH]; char gz_xlogpath[MAXPGPATH];
#endif #endif
} XLogPageReadPrivate; } XLogPageReadPrivate;
@ -853,7 +853,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
snprintf(private_data->xlogpath, MAXPGPATH, "%s/%s", snprintf(private_data->xlogpath, MAXPGPATH, "%s/%s",
private_data->archivedir, xlogfname); private_data->archivedir, xlogfname);
if (fileExists(private_data->xlogpath)) if (fileExists(private_data->xlogpath, private_data->location))
{ {
elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"", elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
private_data->thread_num, private_data->thread_num,
@ -861,7 +861,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
private_data->xlogexists = true; private_data->xlogexists = true;
private_data->xlogfile = fio_open(private_data->xlogpath, private_data->xlogfile = fio_open(private_data->xlogpath,
O_RDONLY | PG_BINARY, FIO_DB_HOST); O_RDONLY | PG_BINARY, private_data->location);
if (private_data->xlogfile < 0) if (private_data->xlogfile < 0)
{ {
@ -879,7 +879,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
snprintf(private_data->gz_xlogpath, snprintf(private_data->gz_xlogpath,
sizeof(private_data->gz_xlogpath), "%s.gz", sizeof(private_data->gz_xlogpath), "%s.gz",
private_data->xlogpath); private_data->xlogpath);
if (fileExists(private_data->gz_xlogpath)) if (fileExists(private_data->gz_xlogpath, private_data->location))
{ {
elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"", elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
private_data->thread_num, private_data->gz_xlogpath); private_data->thread_num, private_data->gz_xlogpath);
@ -965,6 +965,7 @@ InitXLogPageRead(XLogPageReadPrivate *private_data, const char *archivedir,
private_data->tli = tli; private_data->tli = tli;
private_data->xlog_seg_size = xlog_seg_size; private_data->xlog_seg_size = xlog_seg_size;
private_data->xlogfile = -1; private_data->xlogfile = -1;
private_data->location = FIO_BACKUP_HOST;
if (allocate_reader) if (allocate_reader)
{ {

View File

@ -389,7 +389,11 @@ main(int argc, char *argv[])
fio_redirect(STDIN_FILENO, STDOUT_FILENO); fio_redirect(STDIN_FILENO, STDOUT_FILENO);
} else { } else {
/* Execute remote probackup */ /* Execute remote probackup */
remote_execute(argc, argv, backup_subcmd == BACKUP_CMD); int status = remote_execute(argc, argv, backup_subcmd == BACKUP_CMD);
if (status != 0)
{
return status;
}
} }
} }
@ -537,13 +541,22 @@ main(int argc, char *argv[])
case INIT_CMD: case INIT_CMD:
return do_init(); return do_init();
case BACKUP_CMD: case BACKUP_CMD:
current.stream = stream_wal;
if (ssh_host && !is_remote_agent)
{
current.status = BACKUP_STATUS_DONE;
StrNCpy(current.program_version, PROGRAM_VERSION,
sizeof(current.program_version));
complete_backup();
return 0;
}
else
{ {
const char *backup_mode; const char *backup_mode;
time_t start_time; time_t start_time;
start_time = time(NULL); start_time = time(NULL);
backup_mode = deparse_backup_mode(current.backup_mode); backup_mode = deparse_backup_mode(current.backup_mode);
current.stream = stream_wal;
elog(INFO, "Backup start, pg_probackup version: %s, backup ID: %s, backup mode: %s, instance: %s, stream: %s, remote: %s", elog(INFO, "Backup start, pg_probackup version: %s, backup ID: %s, backup mode: %s, instance: %s, stream: %s, remote: %s",
PROGRAM_VERSION, base36enc(start_time), backup_mode, instance_name, PROGRAM_VERSION, base36enc(start_time), backup_mode, instance_name,

View File

@ -378,6 +378,7 @@ extern const char *pgdata_exclude_dir[];
/* in backup.c */ /* in backup.c */
extern int do_backup(time_t start_time); extern int do_backup(time_t start_time);
extern void complete_backup(void);
extern BackupMode parse_backup_mode(const char *value); extern BackupMode parse_backup_mode(const char *value);
extern const char *deparse_backup_mode(BackupMode mode); extern const char *deparse_backup_mode(BackupMode mode);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode, extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
@ -475,7 +476,7 @@ extern pgBackup* find_parent_full_backup(pgBackup *current_backup);
extern int scan_parent_chain(pgBackup *current_backup, pgBackup **result_backup); extern int scan_parent_chain(pgBackup *current_backup, pgBackup **result_backup);
extern bool is_parent(time_t parent_backup_time, pgBackup *child_backup, bool inclusive); extern bool is_parent(time_t parent_backup_time, pgBackup *child_backup, bool inclusive);
extern int get_backup_index_number(parray *backup_list, pgBackup *backup); extern int get_backup_index_number(parray *backup_list, pgBackup *backup);
extern void remote_execute(int argc, char *argv[], bool do_backup); extern int remote_execute(int argc, char *argv[], bool do_backup);
#define COMPRESS_ALG_DEFAULT NOT_DEFINED_COMPRESS #define COMPRESS_ALG_DEFAULT NOT_DEFINED_COMPRESS
#define COMPRESS_LEVEL_DEFAULT 1 #define COMPRESS_LEVEL_DEFAULT 1
@ -501,7 +502,7 @@ extern parray *dir_read_file_list(const char *root, const char *file_txt);
extern int dir_create_dir(const char *path, mode_t mode); extern int dir_create_dir(const char *path, mode_t mode);
extern bool dir_is_empty(const char *path); extern bool dir_is_empty(const char *path);
extern bool fileExists(const char *path); extern bool fileExists(const char *path, fio_location location);
extern size_t pgFileSize(const char *path); extern size_t pgFileSize(const char *path);
extern pgFile *pgFileNew(const char *path, bool omit_symlink); extern pgFile *pgFileNew(const char *path, bool omit_symlink);
@ -523,7 +524,7 @@ extern bool backup_data_file(backup_files_arg* arguments,
extern void restore_data_file(const char *to_path, extern void restore_data_file(const char *to_path,
pgFile *file, bool allow_truncate, pgFile *file, bool allow_truncate,
bool write_header); bool write_header);
extern bool copy_file(const char *from_root, const char *to_root, pgFile *file); extern bool copy_file(const char *from_root, const char *to_root, pgFile *file, fio_location location);
extern void move_file(const char *from_root, const char *to_root, pgFile *file); extern void move_file(const char *from_root, const char *to_root, pgFile *file);
extern void push_wal_file(const char *from_path, const char *to_path, extern void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite); bool is_compress, bool overwrite);

View File

@ -587,14 +587,14 @@ restore_files(void *arg)
(arguments->backup->backup_mode == BACKUP_MODE_DIFF_PAGE (arguments->backup->backup_mode == BACKUP_MODE_DIFF_PAGE
|| arguments->backup->backup_mode == BACKUP_MODE_DIFF_PTRACK)) || arguments->backup->backup_mode == BACKUP_MODE_DIFF_PTRACK))
{ {
elog(VERBOSE, "The file didn`t change. Skip restore: %s", file->path); elog(INFO, "The file didn`t change. Skip restore: %s", file->path);
continue; continue;
} }
/* Directories were created before */ /* Directories were created before */
if (S_ISDIR(file->mode)) if (S_ISDIR(file->mode))
{ {
elog(VERBOSE, "directory, skip"); elog(INFO, "directory, skip");
continue; continue;
} }
@ -611,7 +611,7 @@ restore_files(void *arg)
* block and have BackupPageHeader meta information, so we cannot just * block and have BackupPageHeader meta information, so we cannot just
* copy the file from backup. * copy the file from backup.
*/ */
elog(VERBOSE, "Restoring file %s, is_datafile %i, is_cfs %i", elog(INFO, "Restoring file %s, is_datafile %i, is_cfs %i",
file->path, file->is_datafile?1:0, file->is_cfs?1:0); file->path, file->is_datafile?1:0, file->is_cfs?1:0);
if (file->is_datafile && !file->is_cfs) if (file->is_datafile && !file->is_cfs)
{ {
@ -624,7 +624,7 @@ restore_files(void *arg)
false); false);
} }
else else
copy_file(from_root, pgdata, file); copy_file(from_root, pgdata, file, FIO_DB_HOST);
/* print size of restored file */ /* print size of restored file */
if (file->write_size != BYTES_INVALID) if (file->write_size != BYTES_INVALID)

View File

@ -377,13 +377,20 @@ ssize_t fio_read(int fd, void* buf, size_t size)
} }
} }
int fio_stat(int fd, struct stat* st) int fio_ffstat(FILE* f, struct stat* st)
{
return fio_is_remote_file(f)
? fio_fstat(fio_fileno(f), st)
: fio_fstat(fileno(f), st);
}
int fio_fstat(int fd, struct stat* st)
{ {
if (fio_is_remote_fd(fd)) if (fio_is_remote_fd(fd))
{ {
fio_header hdr; fio_header hdr;
hdr.cop = FIO_STAT; hdr.cop = FIO_FSTAT;
hdr.handle = fd & ~FIO_PIPE_MARKER; hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0; hdr.size = 0;
@ -394,6 +401,40 @@ int fio_stat(int fd, struct stat* st)
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_FSTAT);
IO_CHECK(fio_read_all(fio_stdin, st, sizeof(*st)), sizeof(*st));
SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex));
return hdr.arg;
}
else
{
return fstat(fd, st);
}
}
int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location)
{
if (fio_is_remote(location))
{
fio_header hdr;
size_t path_len = strlen(path) + 1;
hdr.cop = FIO_STAT;
hdr.handle = -1;
hdr.arg = follow_symlinks;
hdr.size = path_len;
SYS_CHECK(pthread_mutex_lock(&fio_read_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_STAT); Assert(hdr.cop == FIO_STAT);
IO_CHECK(fio_read_all(fio_stdin, st, sizeof(*st)), sizeof(*st)); IO_CHECK(fio_read_all(fio_stdin, st, sizeof(*st)), sizeof(*st));
@ -404,7 +445,7 @@ int fio_stat(int fd, struct stat* st)
} }
else else
{ {
return fstat(fd, st); return follow_symlinks ? stat(path, st) : lstat(path, st);
} }
} }
@ -566,12 +607,32 @@ static void fio_send_file(int out, char const* path)
} }
} }
void fio_transfer(void* addr, size_t value)
{
struct {
fio_header hdr;
fio_binding bind;
} msg;
msg.hdr.cop = FIO_TRANSFER;
msg.hdr.size = sizeof(fio_binding);
msg.bind.address = (size_t*)addr;
msg.bind.value = value;
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
IO_CHECK(fio_write_all(fio_stdout, &msg, sizeof(msg)), sizeof(msg));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
}
void fio_communicate(int in, int out) void fio_communicate(int in, int out)
{ {
int fd[FIO_FDMAX]; int fd[FIO_FDMAX];
size_t buf_size = 128*1024; size_t buf_size = 128*1024;
char* buf = (char*)malloc(buf_size); char* buf = (char*)malloc(buf_size);
fio_header hdr; fio_header hdr;
struct stat st;
int rc; int rc;
while ((rc = fio_read_all(in, &hdr, sizeof hdr)) == sizeof(hdr)) { while ((rc = fio_read_all(in, &hdr, sizeof hdr)) == sizeof(hdr)) {
@ -606,15 +667,18 @@ void fio_communicate(int in, int out)
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, buf, rc), rc); IO_CHECK(fio_write_all(out, buf, rc), rc);
break; break;
case FIO_FSTAT:
hdr.size = sizeof(st);
hdr.arg = fstat(fd[hdr.handle], &st);
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, &st, sizeof(st)), sizeof(st));
break;
case FIO_STAT: case FIO_STAT:
{ hdr.size = sizeof(st);
struct stat st; hdr.arg = hdr.arg ? stat(buf, &st) : lstat(buf, &st);
hdr.size = sizeof(st); IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
hdr.arg = fstat(fd[hdr.handle], &st); IO_CHECK(fio_write_all(out, &st, sizeof(st)), sizeof(st));
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); break;
IO_CHECK(fio_write_all(out, &st, sizeof(st)), sizeof(st));
break;
}
case FIO_ACCESS: case FIO_ACCESS:
hdr.size = 0; hdr.size = 0;
hdr.arg = access(buf, hdr.arg); hdr.arg = access(buf, hdr.arg);
@ -638,6 +702,9 @@ void fio_communicate(int in, int out)
case FIO_TRUNCATE: case FIO_TRUNCATE:
SYS_CHECK(ftruncate(fd[hdr.handle], hdr.arg)); SYS_CHECK(ftruncate(fd[hdr.handle], hdr.arg));
break; break;
case FIO_TRANSFER:
*((fio_binding*)buf)->address = ((fio_binding*)buf)->value;
break;
default: default:
Assert(false); Assert(false);
} }

View File

@ -17,12 +17,15 @@ typedef enum
FIO_READ, FIO_READ,
FIO_LOAD, FIO_LOAD,
FIO_STAT, FIO_STAT,
FIO_FSTAT,
FIO_SEND, FIO_SEND,
FIO_ACCESS FIO_ACCESS,
FIO_TRANSFER
} fio_operations; } fio_operations;
typedef enum typedef enum
{ {
FIO_LOCAL_HOST,
FIO_DB_HOST, FIO_DB_HOST,
FIO_BACKUP_HOST FIO_BACKUP_HOST
} fio_location; } fio_location;
@ -33,13 +36,20 @@ typedef enum
#define SYS_CHECK(cmd) do if ((cmd) < 0) { perror(#cmd); exit(EXIT_FAILURE); } while (0) #define SYS_CHECK(cmd) do if ((cmd) < 0) { perror(#cmd); exit(EXIT_FAILURE); } while (0)
#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d\n", __FILE__, __LINE__, _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0) #define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d\n", __FILE__, __LINE__, _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0)
typedef struct { typedef struct
{
unsigned cop : 4; unsigned cop : 4;
unsigned handle : 8; unsigned handle : 8;
unsigned size : 20; unsigned size : 20;
unsigned arg; unsigned arg;
} fio_header; } fio_header;
typedef struct
{
size_t* address;
size_t value;
} fio_binding;
extern void fio_redirect(int in, int out); extern void fio_redirect(int in, int out);
extern void fio_communicate(int in, int out); extern void fio_communicate(int in, int out);
@ -51,13 +61,14 @@ extern int fio_fflush(FILE* f);
extern int fio_fseek(FILE* f, off_t offs); extern int fio_fseek(FILE* f, off_t offs);
extern int fio_ftruncate(FILE* f, off_t size); extern int fio_ftruncate(FILE* f, off_t size);
extern int fio_fclose(FILE* f); extern int fio_fclose(FILE* f);
extern int fio_ffstat(FILE* f, struct stat* st);
extern int fio_open(char const* name, int mode, fio_location location); extern int fio_open(char const* name, int mode, fio_location location);
extern ssize_t fio_write(int fd, void const* buf, size_t size); extern ssize_t fio_write(int fd, void const* buf, size_t size);
extern ssize_t fio_read(int fd, void* buf, size_t size); extern ssize_t fio_read(int fd, void* buf, size_t size);
extern int fio_flush(int fd); extern int fio_flush(int fd);
extern int fio_seek(int fd, off_t offs); extern int fio_seek(int fd, off_t offs);
extern int fio_stat(int fd, struct stat* st); extern int fio_fstat(int fd, struct stat* st);
extern int fio_truncate(int fd, off_t size); extern int fio_truncate(int fd, off_t size);
extern int fio_close(int fd); extern int fio_close(int fd);
@ -66,9 +77,12 @@ extern int fio_unlink(char const* path, fio_location location);
extern int fio_mkdir(char const* path, int mode, fio_location location); extern int fio_mkdir(char const* path, int mode, fio_location location);
extern int fio_chmod(char const* path, int mode, fio_location location); extern int fio_chmod(char const* path, int mode, fio_location location);
extern int fio_access(char const* path, int mode, fio_location location); extern int fio_access(char const* path, int mode, fio_location location);
extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location);
extern FILE* fio_open_stream(char const* name, fio_location location); extern FILE* fio_open_stream(char const* name, fio_location location);
extern int fio_close_stream(FILE* f); extern int fio_close_stream(FILE* f);
extern void fio_transfer(void* addr, size_t value);
#endif #endif

View File

@ -21,7 +21,7 @@ static int append_option(char* buf, size_t buf_size, size_t dst, char const* src
return dst + len + 1; return dst + len + 1;
} }
void remote_execute(int argc, char* argv[], bool listen) int remote_execute(int argc, char* argv[], bool listen)
{ {
char cmd[MAX_CMDLINE_LENGTH]; char cmd[MAX_CMDLINE_LENGTH];
size_t dst = 0; size_t dst = 0;
@ -66,6 +66,7 @@ void remote_execute(int argc, char* argv[], bool listen)
SYS_CHECK(close(outfd[1])); SYS_CHECK(close(outfd[1]));
SYS_CHECK(execvp(ssh_argv[0], ssh_argv)); SYS_CHECK(execvp(ssh_argv[0], ssh_argv));
return -1;
} else { } else {
SYS_CHECK(close(outfd[0])); /* These are being used by the child */ SYS_CHECK(close(outfd[0])); /* These are being used by the child */
SYS_CHECK(close(infd[1])); SYS_CHECK(close(infd[1]));
@ -74,9 +75,10 @@ void remote_execute(int argc, char* argv[], bool listen)
int status; int status;
fio_communicate(infd[0], outfd[1]); fio_communicate(infd[0], outfd[1]);
SYS_CHECK(wait(&status)); SYS_CHECK(wait(&status));
exit(status); return status;
} else { } else {
fio_redirect(infd[0], outfd[1]); /* write to stdout */ fio_redirect(infd[0], outfd[1]); /* write to stdout */
return 0;
} }
} }
} }

View File

@ -305,23 +305,14 @@ dir_get_file_size(const char *pathname)
{ {
struct stat statbuf; struct stat statbuf;
static char tmppath[MAXPGPATH]; static char tmppath[MAXPGPATH];
int fd;
snprintf(tmppath, sizeof(tmppath), "%s/%s", snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname); dir_data->basedir, pathname);
fd = fio_open(tmppath, O_RDONLY|PG_BINARY, FIO_BACKUP_HOST); if (fio_stat(tmppath, &statbuf, true, FIO_BACKUP_HOST) != 0)
if (fd >= 0) return -1;
{
if (fio_stat(fd, &statbuf) != 0) return statbuf.st_size;
{
fio_close(fd);
return -1;
}
fio_close(fd);
return statbuf.st_size;
}
return -1;
} }
static bool static bool