1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-17 21:18:00 +02:00

Fix rermote backup protocol

This commit is contained in:
Konstantin Knizhnik 2018-11-04 10:02:26 +03:00
parent ca5c4b09c7
commit be4c4be4c2
14 changed files with 913 additions and 242 deletions

2
.gitignore vendored
View File

@ -42,5 +42,3 @@
/src/streamutil.c
/src/streamutil.h
/src/xlogreader.c
/src/walmethods.c
/src/walmethods.h

View File

@ -11,13 +11,13 @@ OBJS += src/archive.o src/backup.o src/catalog.o src/configure.o src/data.o \
# borrowed files
OBJS += src/pg_crc.o src/datapagemap.o src/receivelog.o src/streamutil.o \
src/xlogreader.o
src/xlogreader.o src/walmethods.o
EXTRA_CLEAN = src/pg_crc.c src/datapagemap.c src/datapagemap.h src/logging.h \
src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h \
src/xlogreader.c
INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h
INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h src/walmethods.h
ifdef USE_PGXS
PG_CONFIG = pg_config
@ -39,12 +39,6 @@ else
srchome=$(top_srcdir)
endif
ifeq (,$(filter 9.5 9.6,$(MAJORVERSION)))
OBJS += src/walmethods.o
EXTRA_CLEAN += src/walmethods.c src/walmethods.h
INCLUDES += src/walmethods.h
endif
PG_CPPFLAGS = -I$(libpq_srcdir) ${PTHREAD_CFLAGS} -Isrc -I$(top_srcdir)/$(subdir)/src
override CPPFLAGS := -DFRONTEND $(CPPFLAGS) $(PG_CPPFLAGS)
PG_LIBS = $(libpq_pgport) ${PTHREAD_CFLAGS}
@ -73,13 +67,6 @@ src/xlogreader.c: $(top_srcdir)/src/backend/access/transam/xlogreader.c
rm -f $@ && $(LN_S) $(srchome)/src/backend/access/transam/xlogreader.c $@
ifeq (,$(filter 9.5 9.6,$(MAJORVERSION)))
src/walmethods.c: $(top_srcdir)/src/bin/pg_basebackup/walmethods.c
rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.c $@
src/walmethods.h: $(top_srcdir)/src/bin/pg_basebackup/walmethods.h
rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.h $@
endif
ifeq ($(PORTNAME), aix)
CC=xlc_r
endif

View File

@ -1822,6 +1822,7 @@ pg_stop_backup(pgBackup *backup)
PQerrorMessage(conn), stop_backup_query);
}
elog(INFO, "pg_stop backup() successfully executed");
sleep(20);
}
backup_in_progress = false;
@ -1848,15 +1849,15 @@ pg_stop_backup(pgBackup *backup)
/* Write backup_label */
join_path_components(backup_label, path, PG_BACKUP_LABEL_FILE);
fp = fio_open(backup_label, PG_BINARY_W, FIO_BACKUP_HOST);
fp = fio_fopen(backup_label, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
elog(ERROR, "can't open backup label file \"%s\": %s",
backup_label, strerror(errno));
len = strlen(PQgetvalue(res, 0, 3));
if (fio_write(fp, PQgetvalue(res, 0, 3), len) != len ||
fio_flush(fp) != 0 ||
fio_close(fp))
if (fio_fwrite(fp, PQgetvalue(res, 0, 3), len) != len ||
fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "can't write backup label file \"%s\": %s",
backup_label, strerror(errno));
@ -1895,15 +1896,15 @@ pg_stop_backup(pgBackup *backup)
char tablespace_map[MAXPGPATH];
join_path_components(tablespace_map, path, PG_TABLESPACE_MAP_FILE);
fp = fio_open(tablespace_map, PG_BINARY_W, FIO_BACKUP_HOST);
fp = fio_fopen(tablespace_map, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
elog(ERROR, "can't open tablespace map file \"%s\": %s",
tablespace_map, strerror(errno));
len = strlen(val);
if (fio_write(fp, val, len) != len ||
fio_flush(fp) != 0 ||
fio_close(fp))
if (fio_fwrite(fp, val, len) != len ||
fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "can't write tablespace map file \"%s\": %s",
tablespace_map, strerror(errno));

View File

@ -26,8 +26,8 @@ static void
unlink_lock_atexit(void)
{
int res;
res = unlink(lock_file);
if (res != 0 && res != ENOENT)
res = fio_unlink(lock_file, FIO_BACKUP_HOST);
if (res != 0 && errno != ENOENT)
elog(WARNING, "%s: %s", lock_file, strerror(errno));
}
@ -90,7 +90,7 @@ catalog_lock(void)
* Think not to make the file protection weaker than 0600. See
* comments below.
*/
fd = open(lock_file, O_RDWR | O_CREAT | O_EXCL, 0600);
fd = fio_open(lock_file, O_RDWR | O_CREAT | O_EXCL, FIO_BACKUP_HOST);
if (fd >= 0)
break; /* Success; exit the retry loop */
@ -105,7 +105,7 @@ catalog_lock(void)
* Read the file to get the old owner's PID. Note race condition
* here: file might have been deleted since we tried to create it.
*/
fd = open(lock_file, O_RDONLY, 0600);
fd = fio_open(lock_file, O_RDONLY, FIO_BACKUP_HOST);
if (fd < 0)
{
if (errno == ENOENT)
@ -113,10 +113,10 @@ catalog_lock(void)
elog(ERROR, "could not open lock file \"%s\": %s",
lock_file, strerror(errno));
}
if ((len = read(fd, buffer, sizeof(buffer) - 1)) < 0)
if ((len = fio_read(fd, buffer, sizeof(buffer) - 1)) < 0)
elog(ERROR, "could not read lock file \"%s\": %s",
lock_file, strerror(errno));
close(fd);
fio_close(fd);
if (len == 0)
elog(ERROR, "lock file \"%s\" is empty", lock_file);
@ -149,7 +149,7 @@ catalog_lock(void)
* it. Need a loop because of possible race condition against other
* would-be creators.
*/
if (unlink(lock_file) < 0)
if (fio_unlink(lock_file, FIO_BACKUP_HOST) < 0)
elog(ERROR, "could not remove old lock file \"%s\": %s",
lock_file, strerror(errno));
}
@ -160,32 +160,32 @@ catalog_lock(void)
snprintf(buffer, sizeof(buffer), "%d\n", my_pid);
errno = 0;
if (write(fd, buffer, strlen(buffer)) != strlen(buffer))
if (fio_write(fd, buffer, strlen(buffer)) != strlen(buffer))
{
int save_errno = errno;
close(fd);
unlink(lock_file);
fio_close(fd);
fio_unlink(lock_file, FIO_BACKUP_HOST);
/* if write didn't set errno, assume problem is no disk space */
errno = save_errno ? save_errno : ENOSPC;
elog(ERROR, "could not write lock file \"%s\": %s",
lock_file, strerror(errno));
}
if (fsync(fd) != 0)
if (fio_flush(fd) != 0)
{
int save_errno = errno;
close(fd);
unlink(lock_file);
fio_close(fd);
fio_unlink(lock_file, FIO_BACKUP_HOST);
errno = save_errno;
elog(ERROR, "could not write lock file \"%s\": %s",
lock_file, strerror(errno));
}
if (close(fd) != 0)
if (fio_close(fd) != 0)
{
int save_errno = errno;
unlink(lock_file);
fio_unlink(lock_file, FIO_BACKUP_HOST);
errno = save_errno;
elog(ERROR, "could not write lock file \"%s\": %s",
lock_file, strerror(errno));
@ -434,46 +434,46 @@ pgBackupWriteControl(FILE *out, pgBackup *backup)
{
char timestamp[100];
fio_printf(out, "#Configuration\n");
fio_printf(out, "backup-mode = %s\n", pgBackupGetBackupMode(backup));
fio_printf(out, "stream = %s\n", backup->stream ? "true" : "false");
fio_printf(out, "compress-alg = %s\n",
fio_fprintf(out, "#Configuration\n");
fio_fprintf(out, "backup-mode = %s\n", pgBackupGetBackupMode(backup));
fio_fprintf(out, "stream = %s\n", backup->stream ? "true" : "false");
fio_fprintf(out, "compress-alg = %s\n",
deparse_compress_alg(backup->compress_alg));
fio_printf(out, "compress-level = %d\n", backup->compress_level);
fio_printf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false");
fio_fprintf(out, "compress-level = %d\n", backup->compress_level);
fio_fprintf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false");
fio_printf(out, "\n#Compatibility\n");
fio_printf(out, "block-size = %u\n", backup->block_size);
fio_printf(out, "xlog-block-size = %u\n", backup->wal_block_size);
fio_printf(out, "checksum-version = %u\n", backup->checksum_version);
fio_fprintf(out, "\n#Compatibility\n");
fio_fprintf(out, "block-size = %u\n", backup->block_size);
fio_fprintf(out, "xlog-block-size = %u\n", backup->wal_block_size);
fio_fprintf(out, "checksum-version = %u\n", backup->checksum_version);
if (backup->program_version[0] != '\0')
fio_printf(out, "program-version = %s\n", backup->program_version);
fio_fprintf(out, "program-version = %s\n", backup->program_version);
if (backup->server_version[0] != '\0')
fio_printf(out, "server-version = %s\n", backup->server_version);
fio_fprintf(out, "server-version = %s\n", backup->server_version);
fio_printf(out, "\n#Result backup info\n");
fio_printf(out, "timelineid = %d\n", backup->tli);
fio_fprintf(out, "\n#Result backup info\n");
fio_fprintf(out, "timelineid = %d\n", backup->tli);
/* LSN returned by pg_start_backup */
fio_printf(out, "start-lsn = %X/%X\n",
fio_fprintf(out, "start-lsn = %X/%X\n",
(uint32) (backup->start_lsn >> 32),
(uint32) backup->start_lsn);
/* LSN returned by pg_stop_backup */
fio_printf(out, "stop-lsn = %X/%X\n",
fio_fprintf(out, "stop-lsn = %X/%X\n",
(uint32) (backup->stop_lsn >> 32),
(uint32) backup->stop_lsn);
time2iso(timestamp, lengthof(timestamp), backup->start_time);
fio_printf(out, "start-time = '%s'\n", timestamp);
fio_fprintf(out, "start-time = '%s'\n", timestamp);
if (backup->end_time > 0)
{
time2iso(timestamp, lengthof(timestamp), backup->end_time);
fio_printf(out, "end-time = '%s'\n", timestamp);
fio_fprintf(out, "end-time = '%s'\n", timestamp);
}
fio_printf(out, "recovery-xid = " XID_FMT "\n", backup->recovery_xid);
fio_fprintf(out, "recovery-xid = " XID_FMT "\n", backup->recovery_xid);
if (backup->recovery_time > 0)
{
time2iso(timestamp, lengthof(timestamp), backup->recovery_time);
fio_printf(out, "recovery-time = '%s'\n", timestamp);
fio_fprintf(out, "recovery-time = '%s'\n", timestamp);
}
/*
@ -481,20 +481,20 @@ pgBackupWriteControl(FILE *out, pgBackup *backup)
* WAL segments in archive 'wal' directory.
*/
if (backup->data_bytes != BYTES_INVALID)
fio_printf(out, "data-bytes = " INT64_FORMAT "\n", backup->data_bytes);
fio_fprintf(out, "data-bytes = " INT64_FORMAT "\n", backup->data_bytes);
if (backup->wal_bytes != BYTES_INVALID)
fio_printf(out, "wal-bytes = " INT64_FORMAT "\n", backup->wal_bytes);
fio_fprintf(out, "wal-bytes = " INT64_FORMAT "\n", backup->wal_bytes);
fio_printf(out, "status = %s\n", status2str(backup->status));
fio_fprintf(out, "status = %s\n", status2str(backup->status));
/* 'parent_backup' is set if it is incremental backup */
if (backup->parent_backup != 0)
fio_printf(out, "parent-backup-id = '%s'\n", base36enc(backup->parent_backup));
fio_fprintf(out, "parent-backup-id = '%s'\n", base36enc(backup->parent_backup));
/* print connection info except password */
if (backup->primary_conninfo)
fio_printf(out, "primary_conninfo = '%s'\n", backup->primary_conninfo);
fio_fprintf(out, "primary_conninfo = '%s'\n", backup->primary_conninfo);
}
/*
@ -507,14 +507,14 @@ write_backup(pgBackup *backup)
char conf_path[MAXPGPATH];
pgBackupGetPath(backup, conf_path, lengthof(conf_path), BACKUP_CONTROL_FILE);
fp = fio_open(conf_path, PG_BINARY_W, FIO_BACKUP_HOST);
fp = fio_fopen(conf_path, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
elog(ERROR, "Cannot open configuration file \"%s\": %s", conf_path,
strerror(errno));
pgBackupWriteControl(fp, backup);
fio_close(fp);
fio_fclose(fp);
}
/*
@ -528,15 +528,15 @@ pgBackupWriteFileList(pgBackup *backup, parray *files, const char *root)
pgBackupGetPath(backup, path, lengthof(path), DATABASE_FILE_LIST);
fp = fio_open(path, PG_BINARY_W, FIO_BACKUP_HOST);
fp = fio_fopen(path, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
elog(ERROR, "cannot open file list \"%s\": %s", path,
strerror(errno));
print_file_list(fp, files, root);
if (fio_flush(fp) != 0 ||
fio_close(fp))
if (fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno));
}
@ -587,7 +587,7 @@ readBackupControlFile(const char *path)
};
pgBackupInit(backup);
if (access(path, F_OK) != 0)
if (fio_access(path, F_OK, FIO_BACKUP_HOST) != 0)
{
elog(WARNING, "Control file \"%s\" doesn't exist", path);
pgBackupFree(backup);

View File

@ -419,12 +419,12 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
COMP_TRADITIONAL_CRC32(*crc, write_buffer, write_buffer_size);
/* write data page */
if (fio_write(out, write_buffer, write_buffer_size) != write_buffer_size)
if (fio_fwrite(out, write_buffer, write_buffer_size) != write_buffer_size)
{
int errno_tmp = errno;
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "File: %s, cannot write backup at block %u : %s",
file->path, blknum, strerror(errno_tmp));
}
@ -512,7 +512,7 @@ backup_data_file(backup_files_arg* arguments,
nblocks = file->size/BLCKSZ;
/* open backup file for write */
out = fio_open(to_path, PG_BINARY_W, FIO_BACKUP_HOST);
out = fio_fopen(to_path, PG_BINARY_W, FIO_BACKUP_HOST);
if (out == NULL)
{
int errno_tmp = errno;
@ -575,13 +575,13 @@ backup_data_file(backup_files_arg* arguments,
{
int errno_tmp = errno;
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot change mode of \"%s\": %s", file->path,
strerror(errno_tmp));
}
if (fio_flush(out) != 0 ||
fio_close(out))
if (fio_fflush(out) != 0 ||
fio_fclose(out))
elog(ERROR, "cannot write backup file \"%s\": %s",
to_path, strerror(errno));
fclose(in);
@ -638,7 +638,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
* modified pages for differential restore. If the file does not exist,
* re-open it with "w" to create an empty file.
*/
out = fio_open(to_path, PG_BINARY_W "+", FIO_DB_HOST);
out = fio_fopen(to_path, PG_BINARY_W "+", FIO_DB_HOST);
if (out == NULL)
{
int errno_tmp = errno;
@ -734,27 +734,27 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
/*
* Seek and write the restored page.
*/
if (fio_seek(out, write_pos) < 0)
if (fio_fseek(out, write_pos) < 0)
elog(ERROR, "cannot seek block %u of \"%s\": %s",
blknum, to_path, strerror(errno));
if (write_header)
{
if (fio_write(out, &header, sizeof(header)) != sizeof(header))
if (fio_fwrite(out, &header, sizeof(header)) != sizeof(header))
elog(ERROR, "cannot write header of block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}
if (header.compressed_size < BLCKSZ)
{
if (fio_write(out, page.data, BLCKSZ) != BLCKSZ)
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}
else
{
/* if page wasn't compressed, we've read full block */
if (fio_write(out, compressed_page.data, BLCKSZ) != BLCKSZ)
if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}
@ -772,7 +772,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
size_t file_size = 0;
/* get file current size */
fio_seek(out, 0);
fio_fseek(out, 0);
file_size = ftell(out);
if (file_size > file->n_blocks * BLCKSZ)
@ -792,7 +792,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
/*
* Truncate file to this length.
*/
if (fio_truncate(out, write_pos) != 0)
if (fio_ftruncate(out, write_pos) != 0)
elog(ERROR, "cannot truncate \"%s\": %s",
file->path, strerror(errno));
elog(VERBOSE, "Delta truncate file %s to block %u",
@ -806,13 +806,13 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
if (in)
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot change mode of \"%s\": %s", to_path,
strerror(errno_tmp));
}
if (fio_flush(out) != 0 ||
fio_close(out))
if (fio_fflush(out) != 0 ||
fio_fclose(out))
elog(ERROR, "cannot write \"%s\": %s", to_path, strerror(errno));
if (in)
fclose(in);
@ -858,7 +858,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
/* open backup file for write */
join_path_components(to_path, to_root, file->path + strlen(from_root) + 1);
out = fio_open(to_path, PG_BINARY_W, FIO_BACKUP_HOST);
out = fio_fopen(to_path, PG_BINARY_W, FIO_BACKUP_HOST);
if (out == NULL)
{
int errno_tmp = errno;
@ -871,7 +871,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
if (fstat(fileno(in), &st) == -1)
{
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot stat \"%s\": %s", file->path,
strerror(errno));
}
@ -884,12 +884,12 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
if ((read_len = fread(buf, 1, sizeof(buf), in)) != sizeof(buf))
break;
if (fio_write(out, buf, read_len) != read_len)
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_tmp = errno;
/* oops */
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
}
@ -903,7 +903,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
if (!feof(in))
{
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot read backup mode file \"%s\": %s",
file->path, strerror(errno_tmp));
}
@ -911,12 +911,12 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
/* copy odd part. */
if (read_len > 0)
{
if (fio_write(out, buf, read_len) != read_len)
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_tmp = errno;
/* oops */
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
}
@ -936,13 +936,13 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
{
errno_tmp = errno;
fclose(in);
fio_close(out);
fio_fclose(out);
elog(ERROR, "cannot change mode of \"%s\": %s", to_path,
strerror(errno_tmp));
}
if (fio_flush(out) != 0 ||
fio_close(out))
if (fio_fflush(out) != 0 ||
fio_fclose(out))
elog(ERROR, "cannot write \"%s\": %s", to_path, strerror(errno));
fclose(in);
@ -1059,7 +1059,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, PG_BINARY_W, FIO_BACKUP_HOST);
out = fio_fopen(to_path_temp, PG_BINARY_W, FIO_BACKUP_HOST);
if (out == NULL)
elog(ERROR, "Cannot open destination WAL file \"%s\": %s",
to_path_temp, strerror(errno));
@ -1097,7 +1097,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
else
#endif
{
if (fio_write(out, buf, read_len) != read_len)
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_temp = errno;
unlink(to_path_temp);
@ -1125,8 +1125,8 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
else
#endif
{
if (fio_flush(out) != 0 ||
fio_close(out))
if (fio_fflush(out) != 0 ||
fio_fclose(out))
{
errno_temp = errno;
unlink(to_path_temp);
@ -1217,7 +1217,7 @@ get_wal_file(const char *from_path, const char *to_path)
/* open backup file for write */
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, PG_BINARY_W, FIO_DB_HOST);
out = fio_fopen(to_path_temp, PG_BINARY_W, FIO_DB_HOST);
if (out == NULL)
elog(ERROR, "Cannot open destination WAL file \"%s\": %s",
to_path_temp, strerror(errno));
@ -1254,7 +1254,7 @@ get_wal_file(const char *from_path, const char *to_path)
if (read_len > 0)
{
if (fio_write(out, buf, read_len) != read_len)
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_temp = errno;
unlink(to_path_temp);
@ -1278,8 +1278,8 @@ get_wal_file(const char *from_path, const char *to_path)
}
}
if (fio_flush(out) != 0 ||
fio_close(out))
if (fio_fflush(out) != 0 ||
fio_fclose(out))
{
errno_temp = errno;
unlink(to_path_temp);

View File

@ -1206,7 +1206,7 @@ print_file_list(FILE *out, const parray *files, const char *root)
if (root && strstr(path, root) == path)
path = GetRelativePath(path, root);
fio_printf(out, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", "
fio_fprintf(out, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", "
"\"mode\":\"%u\", \"is_datafile\":\"%u\", "
"\"is_cfs\":\"%u\", \"crc\":\"%u\", "
"\"compress_alg\":\"%s\"",
@ -1215,19 +1215,19 @@ print_file_list(FILE *out, const parray *files, const char *root)
deparse_compress_alg(file->compress_alg));
if (file->is_datafile)
fio_printf(out, ",\"segno\":\"%d\"", file->segno);
fio_fprintf(out, ",\"segno\":\"%d\"", file->segno);
#ifndef WIN32
if (S_ISLNK(file->mode))
#else
if (pgwin32_is_junction(file->path))
#endif
fio_printf(out, ",\"linked\":\"%s\"", file->linked);
fio_fprintf(out, ",\"linked\":\"%s\"", file->linked);
if (file->n_blocks != BLOCKNUM_INVALID)
fio_printf(out, ",\"n_blocks\":\"%i\"", file->n_blocks);
fio_fprintf(out, ",\"n_blocks\":\"%i\"", file->n_blocks);
fio_printf(out, "}\n");
fio_fprintf(out, "}\n");
}
}

View File

@ -34,7 +34,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe)
int len;
snprintf(fullpath, sizeof(fullpath), "%s/%s", datadir, path);
if ((fd = open(fullpath, O_RDONLY | PG_BINARY, 0)) == -1)
if ((fd = fio_open(fullpath, O_RDONLY | PG_BINARY, FIO_DB_HOST)) == -1)
{
if (safe)
return NULL;
@ -43,7 +43,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe)
fullpath, strerror(errno));
}
if (fstat(fd, &statbuf) < 0)
if (fio_stat(fd, &statbuf) < 0)
{
if (safe)
return NULL;
@ -53,10 +53,9 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe)
}
len = statbuf.st_size;
buffer = pg_malloc(len + 1);
if (read(fd, buffer, len) != len)
if (fio_read(fd, buffer, len) != len)
{
if (safe)
return NULL;
@ -65,7 +64,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe)
fullpath, strerror(errno));
}
close(fd);
fio_close(fd);
/* Zero-terminate the buffer. */
buffer[len] = '\0';

View File

@ -860,8 +860,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
private_data->xlogpath);
private_data->xlogexists = true;
private_data->xlogfile = open(private_data->xlogpath,
O_RDONLY | PG_BINARY, 0);
private_data->xlogfile = fio_open(private_data->xlogpath,
O_RDONLY | PG_BINARY, FIO_DB_HOST);
if (private_data->xlogfile < 0)
{
@ -910,14 +910,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
/* Read the requested page */
if (private_data->xlogfile != -1)
{
if (lseek(private_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0)
if (fio_seek(private_data->xlogfile, (off_t) targetPageOff) < 0)
{
elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s",
private_data->thread_num, private_data->xlogpath, strerror(errno));
return -1;
}
if (read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
if (fio_read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s",
private_data->thread_num, private_data->xlogpath, strerror(errno));
@ -993,7 +993,7 @@ CleanupXLogPageRead(XLogReaderState *xlogreader)
private_data = (XLogPageReadPrivate *) xlogreader->private_data;
if (private_data->xlogfile >= 0)
{
close(private_data->xlogfile);
fio_close(private_data->xlogfile);
private_data->xlogfile = -1;
}
#ifdef HAVE_LIBZ

View File

@ -6,32 +6,41 @@
#include "pg_probackup.h"
#include "file.h"
#define PRINTF_BUF_SIZE 1024
#define PRINTF_BUF_SIZE 1024
#define FILE_PERMISSIONS 0600
static pthread_mutex_t fio_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t fio_read_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t fio_write_mutex = PTHREAD_MUTEX_INITIALIZER;
static unsigned long fio_fdset = 0;
static void* fio_stdin_buffer;
static int fio_stdout = 0;
static int fio_stdin = 0;
#define fio_fileno(f) (((size_t)f - 1) | FIO_PIPE_MARKER)
void fio_redirect(int in, int out)
{
fio_stdin = in;
fio_stdout = out;
}
static bool fio_is_remote_file(FILE* fd)
static bool fio_is_remote_file(FILE* file)
{
return (size_t)fd <= FIO_FDMAX;
return (size_t)file <= FIO_FDMAX;
}
static bool fio_is_remote_fd(int fd)
{
return (fd & FIO_PIPE_MARKER) != 0;
}
static bool fio_is_remote(fio_location location)
{
return (location == FIO_BACKUP_HOST && is_remote_agent)
|| (location == FIO_DB_HOST && ssh_host != NULL);
|| (location == FIO_DB_HOST && !is_remote_agent && ssh_host != NULL);
}
static ssize_t fio_read(int fd, void* buf, size_t size)
static ssize_t fio_read_all(int fd, void* buf, size_t size)
{
size_t offs = 0;
while (offs < size)
@ -42,10 +51,29 @@ static ssize_t fio_read(int fd, void* buf, size_t size)
continue;
}
return rc;
} else if (rc == 0) {
break;
}
offs += rc;
}
return size;
return offs;
}
static ssize_t fio_write_all(int fd, void const* buf, size_t size)
{
size_t offs = 0;
while (offs < size)
{
ssize_t rc = write(fd, (char*)buf + offs, size - offs);
if (rc <= 0) {
if (errno == EINTR) {
continue;
}
return rc;
}
offs += rc;
}
return offs;
}
FILE* fio_open_stream(char const* path, fio_location location)
@ -54,18 +82,19 @@ FILE* fio_open_stream(char const* path, fio_location location)
if (fio_is_remote(location))
{
fio_header hdr;
hdr.cop = FIO_READ;
hdr.cop = FIO_LOAD;
hdr.size = strlen(path) + 1;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, path, hdr.size), hdr.size);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, hdr.size), hdr.size);
IO_CHECK(fio_read(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_SEND);
if (hdr.size > 0)
{
Assert(fio_stdin_buffer == NULL);
fio_stdin_buffer = malloc(hdr.size);
IO_CHECK(fio_read(fio_stdin, fio_stdin_buffer, hdr.size), hdr.size);
IO_CHECK(fio_read_all(fio_stdin, fio_stdin_buffer, hdr.size), hdr.size);
f = fmemopen(fio_stdin_buffer, hdr.size, "r");
}
else
@ -90,32 +119,50 @@ int fio_close_stream(FILE* f)
return fclose(f);
}
FILE* fio_open(char const* path, char const* mode, fio_location location)
int fio_open(char const* path, int mode, fio_location location)
{
FILE* f;
int fd;
if (fio_is_remote(location))
{
int i;
fio_header hdr;
unsigned long mask;
SYS_CHECK(pthread_mutex_lock(&fio_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
mask = fio_fdset;
for (i = 0; (mask & 1) != 0; i++, mask >>= 1);
if (i == FIO_FDMAX) {
return NULL;
return -1;
}
hdr.cop = strchr(mode,'+') ? FIO_OPEN_EXISTED : FIO_OPEN_NEW;
hdr.cop = FIO_OPEN;
hdr.handle = i;
hdr.size = strlen(path) + 1;
hdr.arg = mode;
fio_fdset |= 1 << i;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, path, hdr.size), hdr.size);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, hdr.size), hdr.size);
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
f = (FILE*)(size_t)(i + 1);
fd = i | FIO_PIPE_MARKER;
}
else
{
fd = open(path, mode, FILE_PERMISSIONS);
}
return fd;
}
FILE* fio_fopen(char const* path, char const* mode, fio_location location)
{
FILE* f;
if (fio_is_remote(location))
{
int flags = O_RDWR|O_CREAT|PG_BINARY|(strchr(mode, '+') ? 0 : O_TRUNC);
int fd = fio_open(path, flags, location);
f = (FILE*)(size_t)((fd + 1) & ~FIO_PIPE_MARKER);
}
else
{
@ -124,12 +171,12 @@ FILE* fio_open(char const* path, char const* mode, fio_location location)
return f;
}
int fio_printf(FILE* f, char const* format, ...)
int fio_fprintf(FILE* f, char const* format, ...)
{
int rc;
va_list args;
va_start (args, format);
if (fio_stdout)
if (fio_is_remote_file(f))
{
char buf[PRINTF_BUF_SIZE];
#ifdef HAS_VSNPRINTF
@ -138,7 +185,7 @@ int fio_printf(FILE* f, char const* format, ...)
rc = vsprintf(buf, format, args);
#endif
if (rc > 0) {
fio_write(f, buf, rc);
fio_fwrite(f, buf, rc);
}
}
else
@ -149,7 +196,7 @@ int fio_printf(FILE* f, char const* format, ...)
return rc;
}
int fio_flush(FILE* f)
int fio_fflush(FILE* f)
{
int rc = 0;
if (!fio_is_remote_file(f))
@ -162,99 +209,234 @@ int fio_flush(FILE* f)
return rc;
}
int fio_close(FILE* f)
int fio_flush(int fd)
{
if (fio_is_remote_file(f))
return fio_is_remote_fd(fd) ? 0 : fsync(fd);
}
int fio_fclose(FILE* f)
{
return fio_is_remote_file(f)
? fio_close(fio_fileno(f))
: fclose(f);
}
int fio_close(int fd)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
SYS_CHECK(pthread_mutex_lock(&fio_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
hdr.cop = FIO_CLOSE;
hdr.handle = (size_t)f - 1;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0;
fio_fdset &= ~(1 << hdr.handle);
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
{
return fclose(f);
return close(fd);
}
}
int fio_truncate(FILE* f, off_t size)
int fio_ftruncate(FILE* f, off_t size)
{
if (fio_is_remote_file(f))
return fio_is_remote_file(f)
? fio_truncate(fio_fileno(f), size)
: ftruncate(fileno(f), size);
}
int fio_truncate(int fd, off_t size)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
SYS_CHECK(pthread_mutex_lock(&fio_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
hdr.cop = FIO_TRUNCATE;
hdr.handle = (size_t)f - 1;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0;
hdr.arg = size;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
{
return ftruncate(fileno(f), size);
return ftruncate(fd, size);
}
}
int fio_seek(FILE* f, off_t offs)
int fio_fseek(FILE* f, off_t offs)
{
if (fio_is_remote_file(f))
return fio_is_remote_file(f)
? fio_seek(fio_fileno(f), offs)
: fseek(f, offs, SEEK_SET);
}
int fio_seek(int fd, off_t offs)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
SYS_CHECK(pthread_mutex_lock(&fio_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
hdr.cop = FIO_SEEK;
hdr.handle = (size_t)f - 1;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0;
hdr.arg = offs;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
{
return fseek(f, offs, SEEK_SET);
return lseek(fd, offs, SEEK_SET);
}
}
size_t fio_write(FILE* f, void const* buf, size_t size)
size_t fio_fwrite(FILE* f, void const* buf, size_t size)
{
if (fio_is_remote_file(f))
return fio_is_remote_file(f)
? fio_write(fio_fileno(f), buf, size)
: fwrite(buf, 1, size, f);
}
ssize_t fio_write(int fd, void const* buf, size_t size)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
SYS_CHECK(pthread_mutex_lock(&fio_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
hdr.cop = FIO_WRITE;
hdr.handle = (size_t)f - 1;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = size;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, buf, size), size);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, buf, size), size);
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return size;
}
else
{
return fwrite(buf, 1, size, f);
return write(fd, buf, size);
}
}
size_t fio_fread(FILE* f, void* buf, size_t size)
{
return fio_is_remote_file(f)
? fio_read(fio_fileno(f), buf, size)
: fread(buf, 1, size, f);
}
ssize_t fio_read(int fd, void* buf, size_t size)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
hdr.cop = FIO_READ;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0;
hdr.arg = size;
SYS_CHECK(pthread_mutex_lock(&fio_read_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_SEND);
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex));
return hdr.size;
}
else
{
return read(fd, buf, size);
}
}
int fio_stat(int fd, struct stat* st)
{
if (fio_is_remote_fd(fd))
{
fio_header hdr;
hdr.cop = FIO_STAT;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = 0;
SYS_CHECK(pthread_mutex_lock(&fio_read_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_STAT);
IO_CHECK(fio_read_all(fio_stdin, st, sizeof(*st)), sizeof(*st));
SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex));
return hdr.arg;
}
else
{
return fstat(fd, st);
}
}
int fio_access(char const* path, int mode, fio_location location)
{
if (fio_is_remote(location))
{
fio_header hdr;
size_t path_len = strlen(path) + 1;
hdr.cop = FIO_ACCESS;
hdr.handle = -1;
hdr.size = path_len;
hdr.arg = mode;
SYS_CHECK(pthread_mutex_lock(&fio_read_mutex));
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_ACCESS);
SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex));
return hdr.arg;
}
else
{
return access(path, mode);
}
}
@ -269,11 +451,13 @@ int fio_rename(char const* old_path, char const* new_path, fio_location location
hdr.handle = -1;
hdr.size = old_path_len + new_path_len;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, old_path, old_path_len), old_path_len);
IO_CHECK(write(fio_stdout, new_path, new_path_len), new_path_len);
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, old_path, old_path_len), old_path_len);
IO_CHECK(fio_write_all(fio_stdout, new_path, new_path_len), new_path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
@ -292,10 +476,12 @@ int fio_unlink(char const* path, fio_location location)
hdr.handle = -1;
hdr.size = path_len;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
@ -315,10 +501,12 @@ int fio_mkdir(char const* path, int mode, fio_location location)
hdr.size = path_len;
hdr.arg = mode;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
@ -338,10 +526,12 @@ int fio_chmod(char const* path, int mode, fio_location location)
hdr.size = path_len;
hdr.arg = mode;
IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(write(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_lock(&fio_write_mutex));
SYS_CHECK(pthread_mutex_unlock(&fio_mutex));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex));
return 0;
}
else
@ -356,22 +546,22 @@ static void fio_send_file(int out, char const* path)
fio_header hdr;
void* buf = NULL;
hdr.cop = FIO_SEND;
hdr.size = 0;
hdr.cop = FIO_READ;
if (fd >= 0)
{
off_t size = lseek(fd, 0, SEEK_END);
buf = malloc(size);
lseek(fd, 0, SEEK_SET);
IO_CHECK(fio_read(fd, buf, size), size);
IO_CHECK(fio_read_all(fd, buf, size), size);
hdr.size = size;
SYS_CHECK(close(fd));
}
IO_CHECK(write(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (buf)
{
IO_CHECK(write(out, buf, hdr.size), hdr.size);
IO_CHECK(fio_write_all(out, buf, hdr.size), hdr.size);
free(buf);
}
}
@ -379,30 +569,56 @@ static void fio_send_file(int out, char const* path)
void fio_communicate(int in, int out)
{
int fd[FIO_FDMAX];
char buf[BLCKSZ*2]; /* need more space for page header */
size_t buf_size = 128*1024;
char* buf = (char*)malloc(buf_size);
fio_header hdr;
int rc;
while ((rc = read(in, &hdr, sizeof hdr)) == sizeof(hdr)) {
while ((rc = fio_read_all(in, &hdr, sizeof hdr)) == sizeof(hdr)) {
if (hdr.size != 0) {
Assert(hdr.size < sizeof(buf));
IO_CHECK(fio_read(in, buf, hdr.size), hdr.size);
if (hdr.size > buf_size) {
buf_size = hdr.size;
buf = (char*)realloc(buf, buf_size);
}
IO_CHECK(fio_read_all(in, buf, hdr.size), hdr.size);
}
switch (hdr.cop) {
case FIO_READ:
case FIO_LOAD:
fio_send_file(out, buf);
break;
case FIO_OPEN_NEW:
SYS_CHECK(fd[hdr.handle] = open(buf, O_RDWR|O_CREAT|O_TRUNC, 0777));
break;
case FIO_OPEN_EXISTED:
SYS_CHECK(fd[hdr.handle] = open(buf, O_RDWR|O_CREAT, 0777));
case FIO_OPEN:
SYS_CHECK(fd[hdr.handle] = open(buf, hdr.arg, FILE_PERMISSIONS));
break;
case FIO_CLOSE:
SYS_CHECK(close(fd[hdr.handle]));
break;
case FIO_WRITE:
IO_CHECK(write(fd[hdr.handle], buf, hdr.size), hdr.size);
IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size);
break;
case FIO_READ:
if ((size_t)hdr.arg > buf_size) {
buf_size = hdr.arg;
buf = (char*)realloc(buf, buf_size);
}
rc = read(fd[hdr.handle], buf, hdr.arg);
hdr.cop = FIO_SEND;
hdr.size = rc > 0 ? rc : 0;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, buf, rc), rc);
break;
case FIO_STAT:
{
struct stat st;
hdr.size = sizeof(st);
hdr.arg = fstat(fd[hdr.handle], &st);
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, &st, sizeof(st)), sizeof(st));
break;
}
case FIO_ACCESS:
hdr.size = 0;
hdr.arg = access(buf, hdr.arg);
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
break;
case FIO_RENAME:
SYS_CHECK(rename(buf, buf + strlen(buf)));
@ -426,7 +642,7 @@ void fio_communicate(int in, int out)
Assert(false);
}
}
free(buf);
if (rc != 0) {
perror("read");
exit(EXIT_FAILURE);

View File

@ -5,8 +5,7 @@
typedef enum
{
FIO_OPEN_NEW,
FIO_OPEN_EXISTED,
FIO_OPEN,
FIO_CLOSE,
FIO_WRITE,
FIO_RENAME,
@ -15,7 +14,11 @@ typedef enum
FIO_CHMOD,
FIO_SEEK,
FIO_TRUNCATE,
FIO_READ
FIO_READ,
FIO_LOAD,
FIO_STAT,
FIO_SEND,
FIO_ACCESS
} fio_operations;
typedef enum
@ -25,9 +28,10 @@ typedef enum
} fio_location;
#define FIO_FDMAX 64
#define FIO_PIPE_MARKER 0x40000000
#define SYS_CHECK(cmd) do if ((cmd) < 0) { perror(#cmd); exit(EXIT_FAILURE); } while (0)
#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "Receive %d bytes instead of %d\n", _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0)
#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d\n", __FILE__, __LINE__, _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0)
typedef struct {
unsigned cop : 4;
@ -36,20 +40,32 @@ typedef struct {
unsigned arg;
} fio_header;
extern void fio_redirect(int in, int out);
extern void fio_communicate(int in, int out);
extern void fio_redirect(int in, int out);
extern void fio_communicate(int in, int out);
extern FILE* fio_open(char const* name, char const* mode, fio_location location);
extern int fio_close(FILE* f);
extern size_t fio_write(FILE* f, void const* buf, size_t size);
extern int fio_printf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3)));
extern int fio_flush(FILE* f);
extern int fio_seek(FILE* f, off_t offs);
extern int fio_truncate(FILE* f, off_t size);
extern int fio_rename(char const* old_path, char const* new_path, fio_location location);
extern int fio_unlink(char const* path, fio_location location);
extern int fio_mkdir(char const* path, int mode, fio_location location);
extern int fio_chmod(char const* path, int mode, fio_location location);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern size_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_fprintf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3)));
extern int fio_fflush(FILE* f);
extern int fio_fseek(FILE* f, off_t offs);
extern int fio_ftruncate(FILE* f, off_t size);
extern int fio_fclose(FILE* f);
extern int fio_open(char const* name, int mode, fio_location location);
extern ssize_t fio_write(int fd, void const* buf, size_t size);
extern ssize_t fio_read(int fd, void* buf, size_t size);
extern int fio_flush(int fd);
extern int fio_seek(int fd, off_t offs);
extern int fio_stat(int fd, struct stat* st);
extern int fio_truncate(int fd, off_t size);
extern int fio_close(int fd);
extern int fio_rename(char const* old_path, char const* new_path, fio_location location);
extern int fio_unlink(char const* path, fio_location location);
extern int fio_mkdir(char const* path, int mode, fio_location location);
extern int fio_chmod(char const* path, int mode, fio_location location);
extern int fio_access(char const* path, int mode, fio_location location);
extern FILE* fio_open_stream(char const* name, fio_location location);
extern int fio_close_stream(FILE* f);

View File

@ -2164,23 +2164,6 @@ pgut_strdup(const char *str)
return ret;
}
FILE *
pgut_fopen(const char *path, const char *mode, bool missing_ok)
{
FILE *fp;
if ((fp = fopen(path, mode)) == NULL)
{
if (missing_ok && errno == ENOENT)
return NULL;
elog(ERROR, "could not open file \"%s\": %s",
path, strerror(errno));
}
return fp;
}
#ifdef WIN32
static int select_win32(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timeval * timeout);
#define select select_win32

View File

@ -132,11 +132,6 @@ extern char *pgut_strdup(const char *str);
#define pgut_new(type) ((type *) pgut_malloc(sizeof(type)))
#define pgut_newarray(type, n) ((type *) pgut_malloc(sizeof(type) * (n)))
/*
* file operations
*/
extern FILE *pgut_fopen(const char *path, const char *mode, bool missing_ok);
/*
* Assert
*/

384
src/walmethods.c Normal file
View File

@ -0,0 +1,384 @@
/*-------------------------------------------------------------------------
*
* walmethods.c - implementations of different ways to write received wal
*
* NOTE! The caller must ensure that only one method is instantiated in
* any given program, and that it's only instantiated once!
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include "pgtar.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "receivelog.h"
#include "streamutil.h"
#include "pg_probackup.h"
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_wal
*-------------------------------------------------------------------------
*/
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
char *basedir;
int compression;
bool sync;
} DirectoryMethodData;
static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
*/
typedef struct DirectoryMethodFile
{
int fd;
off_t currpos;
char *pathname;
char *fullpath;
char *temp_suffix;
#ifdef HAVE_LIBZ
gzFile gzfp;
#endif
} DirectoryMethodFile;
static const char *
dir_getlasterror(void)
{
/* Directory method always sets errno, so just use strerror */
return strerror(errno);
}
static Walfile
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
static char tmppath[MAXPGPATH];
int fd;
DirectoryMethodFile *f;
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
#endif
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, pathname,
dir_data->compression > 0 ? ".gz" : "",
temp_suffix ? temp_suffix : "");
/*
* Open a file for non-compressed as well as compressed files. Tracking
* the file descriptor is important for dir_sync() method as gzflush()
* does not do any system calls to fsync() to make changes permanent on
* disk.
*/
fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST);
if (fd < 0)
return NULL;
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
gzfp = gzdopen(fd, "wb");
if (gzfp == NULL)
{
close(fd);
return NULL;
}
if (gzsetparams(gzfp, dir_data->compression,
Z_DEFAULT_STRATEGY) != Z_OK)
{
gzclose(gzfp);
return NULL;
}
}
#endif
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression == 0)
{
PGAlignedXLogBlock zerobuf;
int bytes;
memset(zerobuf.data, 0, XLOG_BLCKSZ);
for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
{
errno = 0;
if (fio_write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
int save_errno = errno;
fio_close(fd);
/*
* If write didn't set errno, assume problem is no disk space.
*/
errno = save_errno ? save_errno : ENOSPC;
return NULL;
}
}
if (fio_seek(fd, 0) != 0)
{
int save_errno = errno;
fio_close(fd);
errno = save_errno;
return NULL;
}
}
/*
* fsync WAL file and containing directory, to ensure the file is
* persistently created and zeroed (if padded). That's particularly
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
if (!is_remote_agent && dir_data->sync)
{
if (fsync_fname(tmppath, false, progname) != 0 ||
fsync_parent_path(tmppath, progname) != 0)
{
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
gzclose(gzfp);
else
#endif
close(fd);
return NULL;
}
}
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
f->gzfp = gzfp;
#endif
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
f->fullpath = pg_strdup(tmppath);
if (temp_suffix)
f->temp_suffix = pg_strdup(temp_suffix);
return f;
}
static ssize_t
dir_write(Walfile f, const void *buf, size_t count)
{
ssize_t r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
r = (ssize_t) gzwrite(df->gzfp, buf, count);
else
#endif
r = fio_write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
return r;
}
static off_t
dir_get_current_pos(Walfile f)
{
Assert(f != NULL);
/* Use a cached value to prevent lots of reseeks */
return ((DirectoryMethodFile *) f)->currpos;
}
static int
dir_close(Walfile f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
static char tmppath[MAXPGPATH];
static char tmppath2[MAXPGPATH];
Assert(f != NULL);
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
r = gzclose(df->gzfp);
else
#endif
r = fio_close(df->fd);
if (r == 0)
{
/* Build path to the current version of the file */
if (method == CLOSE_NORMAL && df->temp_suffix)
{
/*
* If we have a temp prefix, normal operation is to rename the
* file.
*/
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "",
df->temp_suffix);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "");
r = durable_rename(tmppath, tmppath2, progname);
}
else if (method == CLOSE_UNLINK)
{
/* Unlink the file once it's closed */
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "",
df->temp_suffix ? df->temp_suffix : "");
r = unlink(tmppath);
}
else
{
/*
* Else either CLOSE_NORMAL and no temp suffix, or
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
if (dir_data->sync && !is_remote_agent)
{
r = fsync_fname(df->fullpath, false, progname);
if (r == 0)
r = fsync_parent_path(df->fullpath, progname);
}
}
}
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
pg_free(df->temp_suffix);
pg_free(df);
return r;
}
static int
dir_sync(Walfile f)
{
Assert(f != NULL);
if (!dir_data->sync)
return 0;
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
return -1;
}
#endif
return fio_flush(((DirectoryMethodFile *) f)->fd);
}
static ssize_t
dir_get_file_size(const char *pathname)
{
struct stat statbuf;
static char tmppath[MAXPGPATH];
int fd;
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
fd = fio_open(tmppath, O_RDONLY|PG_BINARY, FIO_BACKUP_HOST);
if (fd >= 0)
{
if (fio_stat(fd, &statbuf) != 0)
{
fio_close(fd);
return -1;
}
fio_close(fd);
return statbuf.st_size;
}
return -1;
}
static bool
dir_existsfile(const char *pathname)
{
static char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
return fio_access(tmppath, F_OK, FIO_BACKUP_HOST) == 0;
}
static bool
dir_finish(void)
{
if (dir_data->sync && !is_remote_agent)
{
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true, progname) != 0)
return false;
}
return true;
}
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
{
WalWriteMethod *method;
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = dir_open_for_write;
method->write = dir_write;
method->get_current_pos = dir_get_current_pos;
method->get_file_size = dir_get_file_size;
method->close = dir_close;
method->sync = dir_sync;
method->existsfile = dir_existsfile;
method->finish = dir_finish;
method->getlasterror = dir_getlasterror;
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
dir_data->compression = compression;
dir_data->basedir = pg_strdup(basedir);
dir_data->sync = sync;
return method;
}
void
FreeWalDirectoryMethod(void)
{
pg_free(dir_data->basedir);
pg_free(dir_data);
}

92
src/walmethods.h Normal file
View File

@ -0,0 +1,92 @@
/*-------------------------------------------------------------------------
*
* walmethods.h
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.h
*-------------------------------------------------------------------------
*/
typedef void *Walfile;
typedef enum
{
CLOSE_NORMAL,
CLOSE_UNLINK,
CLOSE_NO_RENAME
} WalCloseMethod;
/*
* A WalWriteMethod structure represents the different methods used
* to write the streaming WAL as it's received.
*
* All methods that have a failure return indicator will set state
* allowing the getlasterror() method to return a suitable message.
* Commonly, errno is this state (or part of it); so callers must take
* care not to clobber errno between a failed method call and use of
* getlasterror() to retrieve the message.
*/
typedef struct WalWriteMethod WalWriteMethod;
struct WalWriteMethod
{
/*
* Open a target file. Returns Walfile, or NULL if open failed. If a temp
* suffix is specified, a file with that name will be opened, and then
* automatically renamed in close(). If pad_to_size is specified, the file
* will be padded with NUL up to that size, if supported by the Walmethod.
*/
Walfile (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
/*
* Close an open Walfile, using one or more methods for handling automatic
* unlinking etc. Returns 0 on success, other values for error.
*/
int (*close) (Walfile f, WalCloseMethod method);
/* Check if a file exist */
bool (*existsfile) (const char *pathname);
/* Return the size of a file, or -1 on failure. */
ssize_t (*get_file_size) (const char *pathname);
/*
* Write count number of bytes to the file, and return the number of bytes
* actually written or -1 for error.
*/
ssize_t (*write) (Walfile f, const void *buf, size_t count);
/* Return the current position in a file or -1 on error */
off_t (*get_current_pos) (Walfile f);
/*
* fsync the contents of the specified file. Returns 0 on success.
*/
int (*sync) (Walfile f);
/*
* Clean up the Walmethod, closing any shared resources. For methods like
* tar, this includes writing updated headers. Returns true if the
* close/write/sync of shared resources succeeded, otherwise returns false
* (but the resources are still closed).
*/
bool (*finish) (void);
/* Return a text for the last error in this Walfile */
const char *(*getlasterror) (void);
};
/*
* Available WAL methods:
* - WalDirectoryMethod - write WAL to regular files in a standard pg_wal
* - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_wal
* (only implements the methods required for pg_basebackup,
* not all those required for pg_receivewal)
*/
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
int compression, bool sync);
/* Cleanup routines for previously-created methods */
void FreeWalDirectoryMethod(void);