diff --git a/.gitignore b/.gitignore index 4fc21d91..da388019 100644 --- a/.gitignore +++ b/.gitignore @@ -42,5 +42,3 @@ /src/streamutil.c /src/streamutil.h /src/xlogreader.c -/src/walmethods.c -/src/walmethods.h diff --git a/Makefile b/Makefile index f3e14b5d..75f58f8f 100644 --- a/Makefile +++ b/Makefile @@ -11,13 +11,13 @@ OBJS += src/archive.o src/backup.o src/catalog.o src/configure.o src/data.o \ # borrowed files OBJS += src/pg_crc.o src/datapagemap.o src/receivelog.o src/streamutil.o \ - src/xlogreader.o + src/xlogreader.o src/walmethods.o EXTRA_CLEAN = src/pg_crc.c src/datapagemap.c src/datapagemap.h src/logging.h \ src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h \ src/xlogreader.c -INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h +INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h src/walmethods.h ifdef USE_PGXS PG_CONFIG = pg_config @@ -39,12 +39,6 @@ else srchome=$(top_srcdir) endif -ifeq (,$(filter 9.5 9.6,$(MAJORVERSION))) -OBJS += src/walmethods.o -EXTRA_CLEAN += src/walmethods.c src/walmethods.h -INCLUDES += src/walmethods.h -endif - PG_CPPFLAGS = -I$(libpq_srcdir) ${PTHREAD_CFLAGS} -Isrc -I$(top_srcdir)/$(subdir)/src override CPPFLAGS := -DFRONTEND $(CPPFLAGS) $(PG_CPPFLAGS) PG_LIBS = $(libpq_pgport) ${PTHREAD_CFLAGS} @@ -73,13 +67,6 @@ src/xlogreader.c: $(top_srcdir)/src/backend/access/transam/xlogreader.c rm -f $@ && $(LN_S) $(srchome)/src/backend/access/transam/xlogreader.c $@ -ifeq (,$(filter 9.5 9.6,$(MAJORVERSION))) -src/walmethods.c: $(top_srcdir)/src/bin/pg_basebackup/walmethods.c - rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.c $@ -src/walmethods.h: $(top_srcdir)/src/bin/pg_basebackup/walmethods.h - rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.h $@ -endif - ifeq ($(PORTNAME), aix) CC=xlc_r endif diff --git a/src/backup.c b/src/backup.c index 5abdc12c..9ef93fb9 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1822,6 +1822,7 @@ pg_stop_backup(pgBackup *backup) PQerrorMessage(conn), stop_backup_query); } elog(INFO, "pg_stop backup() successfully executed"); + sleep(20); } backup_in_progress = false; @@ -1848,15 +1849,15 @@ pg_stop_backup(pgBackup *backup) /* Write backup_label */ join_path_components(backup_label, path, PG_BACKUP_LABEL_FILE); - fp = fio_open(backup_label, PG_BINARY_W, FIO_BACKUP_HOST); + fp = fio_fopen(backup_label, PG_BINARY_W, FIO_BACKUP_HOST); if (fp == NULL) elog(ERROR, "can't open backup label file \"%s\": %s", backup_label, strerror(errno)); len = strlen(PQgetvalue(res, 0, 3)); - if (fio_write(fp, PQgetvalue(res, 0, 3), len) != len || - fio_flush(fp) != 0 || - fio_close(fp)) + if (fio_fwrite(fp, PQgetvalue(res, 0, 3), len) != len || + fio_fflush(fp) != 0 || + fio_fclose(fp)) elog(ERROR, "can't write backup label file \"%s\": %s", backup_label, strerror(errno)); @@ -1895,15 +1896,15 @@ pg_stop_backup(pgBackup *backup) char tablespace_map[MAXPGPATH]; join_path_components(tablespace_map, path, PG_TABLESPACE_MAP_FILE); - fp = fio_open(tablespace_map, PG_BINARY_W, FIO_BACKUP_HOST); + fp = fio_fopen(tablespace_map, PG_BINARY_W, FIO_BACKUP_HOST); if (fp == NULL) elog(ERROR, "can't open tablespace map file \"%s\": %s", tablespace_map, strerror(errno)); len = strlen(val); - if (fio_write(fp, val, len) != len || - fio_flush(fp) != 0 || - fio_close(fp)) + if (fio_fwrite(fp, val, len) != len || + fio_fflush(fp) != 0 || + fio_fclose(fp)) elog(ERROR, "can't write tablespace map file \"%s\": %s", tablespace_map, strerror(errno)); diff --git a/src/catalog.c b/src/catalog.c index fae199c6..bf373bac 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -26,8 +26,8 @@ static void unlink_lock_atexit(void) { int res; - res = unlink(lock_file); - if (res != 0 && res != ENOENT) + res = fio_unlink(lock_file, FIO_BACKUP_HOST); + if (res != 0 && errno != ENOENT) elog(WARNING, "%s: %s", lock_file, strerror(errno)); } @@ -90,7 +90,7 @@ catalog_lock(void) * Think not to make the file protection weaker than 0600. See * comments below. */ - fd = open(lock_file, O_RDWR | O_CREAT | O_EXCL, 0600); + fd = fio_open(lock_file, O_RDWR | O_CREAT | O_EXCL, FIO_BACKUP_HOST); if (fd >= 0) break; /* Success; exit the retry loop */ @@ -105,7 +105,7 @@ catalog_lock(void) * Read the file to get the old owner's PID. Note race condition * here: file might have been deleted since we tried to create it. */ - fd = open(lock_file, O_RDONLY, 0600); + fd = fio_open(lock_file, O_RDONLY, FIO_BACKUP_HOST); if (fd < 0) { if (errno == ENOENT) @@ -113,10 +113,10 @@ catalog_lock(void) elog(ERROR, "could not open lock file \"%s\": %s", lock_file, strerror(errno)); } - if ((len = read(fd, buffer, sizeof(buffer) - 1)) < 0) + if ((len = fio_read(fd, buffer, sizeof(buffer) - 1)) < 0) elog(ERROR, "could not read lock file \"%s\": %s", lock_file, strerror(errno)); - close(fd); + fio_close(fd); if (len == 0) elog(ERROR, "lock file \"%s\" is empty", lock_file); @@ -149,7 +149,7 @@ catalog_lock(void) * it. Need a loop because of possible race condition against other * would-be creators. */ - if (unlink(lock_file) < 0) + if (fio_unlink(lock_file, FIO_BACKUP_HOST) < 0) elog(ERROR, "could not remove old lock file \"%s\": %s", lock_file, strerror(errno)); } @@ -160,32 +160,32 @@ catalog_lock(void) snprintf(buffer, sizeof(buffer), "%d\n", my_pid); errno = 0; - if (write(fd, buffer, strlen(buffer)) != strlen(buffer)) + if (fio_write(fd, buffer, strlen(buffer)) != strlen(buffer)) { int save_errno = errno; - close(fd); - unlink(lock_file); + fio_close(fd); + fio_unlink(lock_file, FIO_BACKUP_HOST); /* if write didn't set errno, assume problem is no disk space */ errno = save_errno ? save_errno : ENOSPC; elog(ERROR, "could not write lock file \"%s\": %s", lock_file, strerror(errno)); } - if (fsync(fd) != 0) + if (fio_flush(fd) != 0) { int save_errno = errno; - close(fd); - unlink(lock_file); + fio_close(fd); + fio_unlink(lock_file, FIO_BACKUP_HOST); errno = save_errno; elog(ERROR, "could not write lock file \"%s\": %s", lock_file, strerror(errno)); } - if (close(fd) != 0) + if (fio_close(fd) != 0) { int save_errno = errno; - unlink(lock_file); + fio_unlink(lock_file, FIO_BACKUP_HOST); errno = save_errno; elog(ERROR, "could not write lock file \"%s\": %s", lock_file, strerror(errno)); @@ -434,46 +434,46 @@ pgBackupWriteControl(FILE *out, pgBackup *backup) { char timestamp[100]; - fio_printf(out, "#Configuration\n"); - fio_printf(out, "backup-mode = %s\n", pgBackupGetBackupMode(backup)); - fio_printf(out, "stream = %s\n", backup->stream ? "true" : "false"); - fio_printf(out, "compress-alg = %s\n", + fio_fprintf(out, "#Configuration\n"); + fio_fprintf(out, "backup-mode = %s\n", pgBackupGetBackupMode(backup)); + fio_fprintf(out, "stream = %s\n", backup->stream ? "true" : "false"); + fio_fprintf(out, "compress-alg = %s\n", deparse_compress_alg(backup->compress_alg)); - fio_printf(out, "compress-level = %d\n", backup->compress_level); - fio_printf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false"); + fio_fprintf(out, "compress-level = %d\n", backup->compress_level); + fio_fprintf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false"); - fio_printf(out, "\n#Compatibility\n"); - fio_printf(out, "block-size = %u\n", backup->block_size); - fio_printf(out, "xlog-block-size = %u\n", backup->wal_block_size); - fio_printf(out, "checksum-version = %u\n", backup->checksum_version); + fio_fprintf(out, "\n#Compatibility\n"); + fio_fprintf(out, "block-size = %u\n", backup->block_size); + fio_fprintf(out, "xlog-block-size = %u\n", backup->wal_block_size); + fio_fprintf(out, "checksum-version = %u\n", backup->checksum_version); if (backup->program_version[0] != '\0') - fio_printf(out, "program-version = %s\n", backup->program_version); + fio_fprintf(out, "program-version = %s\n", backup->program_version); if (backup->server_version[0] != '\0') - fio_printf(out, "server-version = %s\n", backup->server_version); + fio_fprintf(out, "server-version = %s\n", backup->server_version); - fio_printf(out, "\n#Result backup info\n"); - fio_printf(out, "timelineid = %d\n", backup->tli); + fio_fprintf(out, "\n#Result backup info\n"); + fio_fprintf(out, "timelineid = %d\n", backup->tli); /* LSN returned by pg_start_backup */ - fio_printf(out, "start-lsn = %X/%X\n", + fio_fprintf(out, "start-lsn = %X/%X\n", (uint32) (backup->start_lsn >> 32), (uint32) backup->start_lsn); /* LSN returned by pg_stop_backup */ - fio_printf(out, "stop-lsn = %X/%X\n", + fio_fprintf(out, "stop-lsn = %X/%X\n", (uint32) (backup->stop_lsn >> 32), (uint32) backup->stop_lsn); time2iso(timestamp, lengthof(timestamp), backup->start_time); - fio_printf(out, "start-time = '%s'\n", timestamp); + fio_fprintf(out, "start-time = '%s'\n", timestamp); if (backup->end_time > 0) { time2iso(timestamp, lengthof(timestamp), backup->end_time); - fio_printf(out, "end-time = '%s'\n", timestamp); + fio_fprintf(out, "end-time = '%s'\n", timestamp); } - fio_printf(out, "recovery-xid = " XID_FMT "\n", backup->recovery_xid); + fio_fprintf(out, "recovery-xid = " XID_FMT "\n", backup->recovery_xid); if (backup->recovery_time > 0) { time2iso(timestamp, lengthof(timestamp), backup->recovery_time); - fio_printf(out, "recovery-time = '%s'\n", timestamp); + fio_fprintf(out, "recovery-time = '%s'\n", timestamp); } /* @@ -481,20 +481,20 @@ pgBackupWriteControl(FILE *out, pgBackup *backup) * WAL segments in archive 'wal' directory. */ if (backup->data_bytes != BYTES_INVALID) - fio_printf(out, "data-bytes = " INT64_FORMAT "\n", backup->data_bytes); + fio_fprintf(out, "data-bytes = " INT64_FORMAT "\n", backup->data_bytes); if (backup->wal_bytes != BYTES_INVALID) - fio_printf(out, "wal-bytes = " INT64_FORMAT "\n", backup->wal_bytes); + fio_fprintf(out, "wal-bytes = " INT64_FORMAT "\n", backup->wal_bytes); - fio_printf(out, "status = %s\n", status2str(backup->status)); + fio_fprintf(out, "status = %s\n", status2str(backup->status)); /* 'parent_backup' is set if it is incremental backup */ if (backup->parent_backup != 0) - fio_printf(out, "parent-backup-id = '%s'\n", base36enc(backup->parent_backup)); + fio_fprintf(out, "parent-backup-id = '%s'\n", base36enc(backup->parent_backup)); /* print connection info except password */ if (backup->primary_conninfo) - fio_printf(out, "primary_conninfo = '%s'\n", backup->primary_conninfo); + fio_fprintf(out, "primary_conninfo = '%s'\n", backup->primary_conninfo); } /* @@ -507,14 +507,14 @@ write_backup(pgBackup *backup) char conf_path[MAXPGPATH]; pgBackupGetPath(backup, conf_path, lengthof(conf_path), BACKUP_CONTROL_FILE); - fp = fio_open(conf_path, PG_BINARY_W, FIO_BACKUP_HOST); + fp = fio_fopen(conf_path, PG_BINARY_W, FIO_BACKUP_HOST); if (fp == NULL) elog(ERROR, "Cannot open configuration file \"%s\": %s", conf_path, strerror(errno)); pgBackupWriteControl(fp, backup); - fio_close(fp); + fio_fclose(fp); } /* @@ -528,15 +528,15 @@ pgBackupWriteFileList(pgBackup *backup, parray *files, const char *root) pgBackupGetPath(backup, path, lengthof(path), DATABASE_FILE_LIST); - fp = fio_open(path, PG_BINARY_W, FIO_BACKUP_HOST); + fp = fio_fopen(path, PG_BINARY_W, FIO_BACKUP_HOST); if (fp == NULL) elog(ERROR, "cannot open file list \"%s\": %s", path, strerror(errno)); print_file_list(fp, files, root); - if (fio_flush(fp) != 0 || - fio_close(fp)) + if (fio_fflush(fp) != 0 || + fio_fclose(fp)) elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno)); } @@ -587,7 +587,7 @@ readBackupControlFile(const char *path) }; pgBackupInit(backup); - if (access(path, F_OK) != 0) + if (fio_access(path, F_OK, FIO_BACKUP_HOST) != 0) { elog(WARNING, "Control file \"%s\" doesn't exist", path); pgBackupFree(backup); diff --git a/src/data.c b/src/data.c index 05644a88..87db7047 100644 --- a/src/data.c +++ b/src/data.c @@ -419,12 +419,12 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum, COMP_TRADITIONAL_CRC32(*crc, write_buffer, write_buffer_size); /* write data page */ - if (fio_write(out, write_buffer, write_buffer_size) != write_buffer_size) + if (fio_fwrite(out, write_buffer, write_buffer_size) != write_buffer_size) { int errno_tmp = errno; fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "File: %s, cannot write backup at block %u : %s", file->path, blknum, strerror(errno_tmp)); } @@ -512,7 +512,7 @@ backup_data_file(backup_files_arg* arguments, nblocks = file->size/BLCKSZ; /* open backup file for write */ - out = fio_open(to_path, PG_BINARY_W, FIO_BACKUP_HOST); + out = fio_fopen(to_path, PG_BINARY_W, FIO_BACKUP_HOST); if (out == NULL) { int errno_tmp = errno; @@ -575,13 +575,13 @@ backup_data_file(backup_files_arg* arguments, { int errno_tmp = errno; fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot change mode of \"%s\": %s", file->path, strerror(errno_tmp)); } - if (fio_flush(out) != 0 || - fio_close(out)) + if (fio_fflush(out) != 0 || + fio_fclose(out)) elog(ERROR, "cannot write backup file \"%s\": %s", to_path, strerror(errno)); fclose(in); @@ -638,7 +638,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, * modified pages for differential restore. If the file does not exist, * re-open it with "w" to create an empty file. */ - out = fio_open(to_path, PG_BINARY_W "+", FIO_DB_HOST); + out = fio_fopen(to_path, PG_BINARY_W "+", FIO_DB_HOST); if (out == NULL) { int errno_tmp = errno; @@ -734,27 +734,27 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, /* * Seek and write the restored page. */ - if (fio_seek(out, write_pos) < 0) + if (fio_fseek(out, write_pos) < 0) elog(ERROR, "cannot seek block %u of \"%s\": %s", blknum, to_path, strerror(errno)); if (write_header) { - if (fio_write(out, &header, sizeof(header)) != sizeof(header)) + if (fio_fwrite(out, &header, sizeof(header)) != sizeof(header)) elog(ERROR, "cannot write header of block %u of \"%s\": %s", blknum, file->path, strerror(errno)); } if (header.compressed_size < BLCKSZ) { - if (fio_write(out, page.data, BLCKSZ) != BLCKSZ) + if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ) elog(ERROR, "cannot write block %u of \"%s\": %s", blknum, file->path, strerror(errno)); } else { /* if page wasn't compressed, we've read full block */ - if (fio_write(out, compressed_page.data, BLCKSZ) != BLCKSZ) + if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ) elog(ERROR, "cannot write block %u of \"%s\": %s", blknum, file->path, strerror(errno)); } @@ -772,7 +772,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, size_t file_size = 0; /* get file current size */ - fio_seek(out, 0); + fio_fseek(out, 0); file_size = ftell(out); if (file_size > file->n_blocks * BLCKSZ) @@ -792,7 +792,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, /* * Truncate file to this length. */ - if (fio_truncate(out, write_pos) != 0) + if (fio_ftruncate(out, write_pos) != 0) elog(ERROR, "cannot truncate \"%s\": %s", file->path, strerror(errno)); elog(VERBOSE, "Delta truncate file %s to block %u", @@ -806,13 +806,13 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, if (in) fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot change mode of \"%s\": %s", to_path, strerror(errno_tmp)); } - if (fio_flush(out) != 0 || - fio_close(out)) + if (fio_fflush(out) != 0 || + fio_fclose(out)) elog(ERROR, "cannot write \"%s\": %s", to_path, strerror(errno)); if (in) fclose(in); @@ -858,7 +858,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) /* open backup file for write */ join_path_components(to_path, to_root, file->path + strlen(from_root) + 1); - out = fio_open(to_path, PG_BINARY_W, FIO_BACKUP_HOST); + out = fio_fopen(to_path, PG_BINARY_W, FIO_BACKUP_HOST); if (out == NULL) { int errno_tmp = errno; @@ -871,7 +871,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) if (fstat(fileno(in), &st) == -1) { fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot stat \"%s\": %s", file->path, strerror(errno)); } @@ -884,12 +884,12 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) if ((read_len = fread(buf, 1, sizeof(buf), in)) != sizeof(buf)) break; - if (fio_write(out, buf, read_len) != read_len) + if (fio_fwrite(out, buf, read_len) != read_len) { errno_tmp = errno; /* oops */ fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot write to \"%s\": %s", to_path, strerror(errno_tmp)); } @@ -903,7 +903,7 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) if (!feof(in)) { fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot read backup mode file \"%s\": %s", file->path, strerror(errno_tmp)); } @@ -911,12 +911,12 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) /* copy odd part. */ if (read_len > 0) { - if (fio_write(out, buf, read_len) != read_len) + if (fio_fwrite(out, buf, read_len) != read_len) { errno_tmp = errno; /* oops */ fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot write to \"%s\": %s", to_path, strerror(errno_tmp)); } @@ -936,13 +936,13 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) { errno_tmp = errno; fclose(in); - fio_close(out); + fio_fclose(out); elog(ERROR, "cannot change mode of \"%s\": %s", to_path, strerror(errno_tmp)); } - if (fio_flush(out) != 0 || - fio_close(out)) + if (fio_fflush(out) != 0 || + fio_fclose(out)) elog(ERROR, "cannot write \"%s\": %s", to_path, strerror(errno)); fclose(in); @@ -1059,7 +1059,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); - out = fio_open(to_path_temp, PG_BINARY_W, FIO_BACKUP_HOST); + out = fio_fopen(to_path_temp, PG_BINARY_W, FIO_BACKUP_HOST); if (out == NULL) elog(ERROR, "Cannot open destination WAL file \"%s\": %s", to_path_temp, strerror(errno)); @@ -1097,7 +1097,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, else #endif { - if (fio_write(out, buf, read_len) != read_len) + if (fio_fwrite(out, buf, read_len) != read_len) { errno_temp = errno; unlink(to_path_temp); @@ -1125,8 +1125,8 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, else #endif { - if (fio_flush(out) != 0 || - fio_close(out)) + if (fio_fflush(out) != 0 || + fio_fclose(out)) { errno_temp = errno; unlink(to_path_temp); @@ -1217,7 +1217,7 @@ get_wal_file(const char *from_path, const char *to_path) /* open backup file for write */ snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); - out = fio_open(to_path_temp, PG_BINARY_W, FIO_DB_HOST); + out = fio_fopen(to_path_temp, PG_BINARY_W, FIO_DB_HOST); if (out == NULL) elog(ERROR, "Cannot open destination WAL file \"%s\": %s", to_path_temp, strerror(errno)); @@ -1254,7 +1254,7 @@ get_wal_file(const char *from_path, const char *to_path) if (read_len > 0) { - if (fio_write(out, buf, read_len) != read_len) + if (fio_fwrite(out, buf, read_len) != read_len) { errno_temp = errno; unlink(to_path_temp); @@ -1278,8 +1278,8 @@ get_wal_file(const char *from_path, const char *to_path) } } - if (fio_flush(out) != 0 || - fio_close(out)) + if (fio_fflush(out) != 0 || + fio_fclose(out)) { errno_temp = errno; unlink(to_path_temp); diff --git a/src/dir.c b/src/dir.c index 9509a2f5..ac4d096e 100644 --- a/src/dir.c +++ b/src/dir.c @@ -1206,7 +1206,7 @@ print_file_list(FILE *out, const parray *files, const char *root) if (root && strstr(path, root) == path) path = GetRelativePath(path, root); - fio_printf(out, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", " + fio_fprintf(out, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", " "\"mode\":\"%u\", \"is_datafile\":\"%u\", " "\"is_cfs\":\"%u\", \"crc\":\"%u\", " "\"compress_alg\":\"%s\"", @@ -1215,19 +1215,19 @@ print_file_list(FILE *out, const parray *files, const char *root) deparse_compress_alg(file->compress_alg)); if (file->is_datafile) - fio_printf(out, ",\"segno\":\"%d\"", file->segno); + fio_fprintf(out, ",\"segno\":\"%d\"", file->segno); #ifndef WIN32 if (S_ISLNK(file->mode)) #else if (pgwin32_is_junction(file->path)) #endif - fio_printf(out, ",\"linked\":\"%s\"", file->linked); + fio_fprintf(out, ",\"linked\":\"%s\"", file->linked); if (file->n_blocks != BLOCKNUM_INVALID) - fio_printf(out, ",\"n_blocks\":\"%i\"", file->n_blocks); + fio_fprintf(out, ",\"n_blocks\":\"%i\"", file->n_blocks); - fio_printf(out, "}\n"); + fio_fprintf(out, "}\n"); } } diff --git a/src/fetch.c b/src/fetch.c index 17e77025..56194611 100644 --- a/src/fetch.c +++ b/src/fetch.c @@ -34,7 +34,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe) int len; snprintf(fullpath, sizeof(fullpath), "%s/%s", datadir, path); - if ((fd = open(fullpath, O_RDONLY | PG_BINARY, 0)) == -1) + if ((fd = fio_open(fullpath, O_RDONLY | PG_BINARY, FIO_DB_HOST)) == -1) { if (safe) return NULL; @@ -43,7 +43,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe) fullpath, strerror(errno)); } - if (fstat(fd, &statbuf) < 0) + if (fio_stat(fd, &statbuf) < 0) { if (safe) return NULL; @@ -53,10 +53,9 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe) } len = statbuf.st_size; - buffer = pg_malloc(len + 1); - if (read(fd, buffer, len) != len) + if (fio_read(fd, buffer, len) != len) { if (safe) return NULL; @@ -65,7 +64,7 @@ slurpFile(const char *datadir, const char *path, size_t *filesize, bool safe) fullpath, strerror(errno)); } - close(fd); + fio_close(fd); /* Zero-terminate the buffer. */ buffer[len] = '\0'; diff --git a/src/parsexlog.c b/src/parsexlog.c index 7f7365f5..29b98d86 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -860,8 +860,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, private_data->xlogpath); private_data->xlogexists = true; - private_data->xlogfile = open(private_data->xlogpath, - O_RDONLY | PG_BINARY, 0); + private_data->xlogfile = fio_open(private_data->xlogpath, + O_RDONLY | PG_BINARY, FIO_DB_HOST); if (private_data->xlogfile < 0) { @@ -910,14 +910,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, /* Read the requested page */ if (private_data->xlogfile != -1) { - if (lseek(private_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0) + if (fio_seek(private_data->xlogfile, (off_t) targetPageOff) < 0) { elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s", private_data->thread_num, private_data->xlogpath, strerror(errno)); return -1; } - if (read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + if (fio_read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s", private_data->thread_num, private_data->xlogpath, strerror(errno)); @@ -993,7 +993,7 @@ CleanupXLogPageRead(XLogReaderState *xlogreader) private_data = (XLogPageReadPrivate *) xlogreader->private_data; if (private_data->xlogfile >= 0) { - close(private_data->xlogfile); + fio_close(private_data->xlogfile); private_data->xlogfile = -1; } #ifdef HAVE_LIBZ diff --git a/src/utils/file.c b/src/utils/file.c index 7febd826..b562fc13 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -6,32 +6,41 @@ #include "pg_probackup.h" #include "file.h" -#define PRINTF_BUF_SIZE 1024 +#define PRINTF_BUF_SIZE 1024 +#define FILE_PERMISSIONS 0600 -static pthread_mutex_t fio_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t fio_read_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t fio_write_mutex = PTHREAD_MUTEX_INITIALIZER; static unsigned long fio_fdset = 0; static void* fio_stdin_buffer; static int fio_stdout = 0; static int fio_stdin = 0; +#define fio_fileno(f) (((size_t)f - 1) | FIO_PIPE_MARKER) + void fio_redirect(int in, int out) { fio_stdin = in; fio_stdout = out; } -static bool fio_is_remote_file(FILE* fd) +static bool fio_is_remote_file(FILE* file) { - return (size_t)fd <= FIO_FDMAX; + return (size_t)file <= FIO_FDMAX; +} + +static bool fio_is_remote_fd(int fd) +{ + return (fd & FIO_PIPE_MARKER) != 0; } static bool fio_is_remote(fio_location location) { return (location == FIO_BACKUP_HOST && is_remote_agent) - || (location == FIO_DB_HOST && ssh_host != NULL); + || (location == FIO_DB_HOST && !is_remote_agent && ssh_host != NULL); } -static ssize_t fio_read(int fd, void* buf, size_t size) +static ssize_t fio_read_all(int fd, void* buf, size_t size) { size_t offs = 0; while (offs < size) @@ -42,10 +51,29 @@ static ssize_t fio_read(int fd, void* buf, size_t size) continue; } return rc; + } else if (rc == 0) { + break; } offs += rc; } - return size; + return offs; +} + +static ssize_t fio_write_all(int fd, void const* buf, size_t size) +{ + size_t offs = 0; + while (offs < size) + { + ssize_t rc = write(fd, (char*)buf + offs, size - offs); + if (rc <= 0) { + if (errno == EINTR) { + continue; + } + return rc; + } + offs += rc; + } + return offs; } FILE* fio_open_stream(char const* path, fio_location location) @@ -54,18 +82,19 @@ FILE* fio_open_stream(char const* path, fio_location location) if (fio_is_remote(location)) { fio_header hdr; - hdr.cop = FIO_READ; + hdr.cop = FIO_LOAD; hdr.size = strlen(path) + 1; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, path, hdr.size), hdr.size); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, hdr.size), hdr.size); - IO_CHECK(fio_read(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + Assert(hdr.cop == FIO_SEND); if (hdr.size > 0) { Assert(fio_stdin_buffer == NULL); fio_stdin_buffer = malloc(hdr.size); - IO_CHECK(fio_read(fio_stdin, fio_stdin_buffer, hdr.size), hdr.size); + IO_CHECK(fio_read_all(fio_stdin, fio_stdin_buffer, hdr.size), hdr.size); f = fmemopen(fio_stdin_buffer, hdr.size, "r"); } else @@ -90,32 +119,50 @@ int fio_close_stream(FILE* f) return fclose(f); } -FILE* fio_open(char const* path, char const* mode, fio_location location) +int fio_open(char const* path, int mode, fio_location location) { - FILE* f; + int fd; if (fio_is_remote(location)) { int i; fio_header hdr; unsigned long mask; - SYS_CHECK(pthread_mutex_lock(&fio_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); + mask = fio_fdset; for (i = 0; (mask & 1) != 0; i++, mask >>= 1); if (i == FIO_FDMAX) { - return NULL; + return -1; } - hdr.cop = strchr(mode,'+') ? FIO_OPEN_EXISTED : FIO_OPEN_NEW; + hdr.cop = FIO_OPEN; hdr.handle = i; hdr.size = strlen(path) + 1; + hdr.arg = mode; fio_fdset |= 1 << i; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, path, hdr.size), hdr.size); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, hdr.size), hdr.size); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); - f = (FILE*)(size_t)(i + 1); + fd = i | FIO_PIPE_MARKER; + } + else + { + fd = open(path, mode, FILE_PERMISSIONS); + } + return fd; +} + +FILE* fio_fopen(char const* path, char const* mode, fio_location location) +{ + FILE* f; + if (fio_is_remote(location)) + { + int flags = O_RDWR|O_CREAT|PG_BINARY|(strchr(mode, '+') ? 0 : O_TRUNC); + int fd = fio_open(path, flags, location); + f = (FILE*)(size_t)((fd + 1) & ~FIO_PIPE_MARKER); } else { @@ -124,12 +171,12 @@ FILE* fio_open(char const* path, char const* mode, fio_location location) return f; } -int fio_printf(FILE* f, char const* format, ...) +int fio_fprintf(FILE* f, char const* format, ...) { int rc; va_list args; va_start (args, format); - if (fio_stdout) + if (fio_is_remote_file(f)) { char buf[PRINTF_BUF_SIZE]; #ifdef HAS_VSNPRINTF @@ -138,7 +185,7 @@ int fio_printf(FILE* f, char const* format, ...) rc = vsprintf(buf, format, args); #endif if (rc > 0) { - fio_write(f, buf, rc); + fio_fwrite(f, buf, rc); } } else @@ -149,7 +196,7 @@ int fio_printf(FILE* f, char const* format, ...) return rc; } -int fio_flush(FILE* f) +int fio_fflush(FILE* f) { int rc = 0; if (!fio_is_remote_file(f)) @@ -162,99 +209,234 @@ int fio_flush(FILE* f) return rc; } -int fio_close(FILE* f) +int fio_flush(int fd) { - if (fio_is_remote_file(f)) + return fio_is_remote_fd(fd) ? 0 : fsync(fd); +} + +int fio_fclose(FILE* f) +{ + return fio_is_remote_file(f) + ? fio_close(fio_fileno(f)) + : fclose(f); +} + +int fio_close(int fd) +{ + if (fio_is_remote_fd(fd)) { fio_header hdr; - SYS_CHECK(pthread_mutex_lock(&fio_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); hdr.cop = FIO_CLOSE; - hdr.handle = (size_t)f - 1; + hdr.handle = fd & ~FIO_PIPE_MARKER; hdr.size = 0; fio_fdset &= ~(1 << hdr.handle); - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else { - return fclose(f); + return close(fd); } } -int fio_truncate(FILE* f, off_t size) +int fio_ftruncate(FILE* f, off_t size) { - if (fio_is_remote_file(f)) + return fio_is_remote_file(f) + ? fio_truncate(fio_fileno(f), size) + : ftruncate(fileno(f), size); +} + +int fio_truncate(int fd, off_t size) +{ + if (fio_is_remote_fd(fd)) { fio_header hdr; - SYS_CHECK(pthread_mutex_lock(&fio_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); hdr.cop = FIO_TRUNCATE; - hdr.handle = (size_t)f - 1; + hdr.handle = fd & ~FIO_PIPE_MARKER; hdr.size = 0; hdr.arg = size; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else { - return ftruncate(fileno(f), size); + return ftruncate(fd, size); } } -int fio_seek(FILE* f, off_t offs) +int fio_fseek(FILE* f, off_t offs) { - if (fio_is_remote_file(f)) + return fio_is_remote_file(f) + ? fio_seek(fio_fileno(f), offs) + : fseek(f, offs, SEEK_SET); +} + +int fio_seek(int fd, off_t offs) +{ + if (fio_is_remote_fd(fd)) { fio_header hdr; - SYS_CHECK(pthread_mutex_lock(&fio_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); hdr.cop = FIO_SEEK; - hdr.handle = (size_t)f - 1; + hdr.handle = fd & ~FIO_PIPE_MARKER; hdr.size = 0; hdr.arg = offs; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else { - return fseek(f, offs, SEEK_SET); + return lseek(fd, offs, SEEK_SET); } } -size_t fio_write(FILE* f, void const* buf, size_t size) +size_t fio_fwrite(FILE* f, void const* buf, size_t size) { - if (fio_is_remote_file(f)) + return fio_is_remote_file(f) + ? fio_write(fio_fileno(f), buf, size) + : fwrite(buf, 1, size, f); +} + +ssize_t fio_write(int fd, void const* buf, size_t size) +{ + if (fio_is_remote_fd(fd)) { fio_header hdr; - SYS_CHECK(pthread_mutex_lock(&fio_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); hdr.cop = FIO_WRITE; - hdr.handle = (size_t)f - 1; + hdr.handle = fd & ~FIO_PIPE_MARKER; hdr.size = size; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, buf, size), size); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, buf, size), size); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return size; } else { - return fwrite(buf, 1, size, f); + return write(fd, buf, size); + } +} + +size_t fio_fread(FILE* f, void* buf, size_t size) +{ + return fio_is_remote_file(f) + ? fio_read(fio_fileno(f), buf, size) + : fread(buf, 1, size, f); +} + +ssize_t fio_read(int fd, void* buf, size_t size) +{ + if (fio_is_remote_fd(fd)) + { + fio_header hdr; + + hdr.cop = FIO_READ; + hdr.handle = fd & ~FIO_PIPE_MARKER; + hdr.size = 0; + hdr.arg = size; + + SYS_CHECK(pthread_mutex_lock(&fio_read_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); + + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); + + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + Assert(hdr.cop == FIO_SEND); + IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size); + + SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex)); + + return hdr.size; + } + else + { + return read(fd, buf, size); + } +} + +int fio_stat(int fd, struct stat* st) +{ + if (fio_is_remote_fd(fd)) + { + fio_header hdr; + + hdr.cop = FIO_STAT; + hdr.handle = fd & ~FIO_PIPE_MARKER; + hdr.size = 0; + + SYS_CHECK(pthread_mutex_lock(&fio_read_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); + + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); + + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + Assert(hdr.cop == FIO_STAT); + IO_CHECK(fio_read_all(fio_stdin, st, sizeof(*st)), sizeof(*st)); + + SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex)); + + return hdr.arg; + } + else + { + return fstat(fd, st); + } +} + +int fio_access(char const* path, int mode, fio_location location) +{ + if (fio_is_remote(location)) + { + fio_header hdr; + size_t path_len = strlen(path) + 1; + hdr.cop = FIO_ACCESS; + hdr.handle = -1; + hdr.size = path_len; + hdr.arg = mode; + + SYS_CHECK(pthread_mutex_lock(&fio_read_mutex)); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); + + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); + + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + Assert(hdr.cop == FIO_ACCESS); + + SYS_CHECK(pthread_mutex_unlock(&fio_read_mutex)); + + return hdr.arg; + } + else + { + return access(path, mode); } } @@ -269,11 +451,13 @@ int fio_rename(char const* old_path, char const* new_path, fio_location location hdr.handle = -1; hdr.size = old_path_len + new_path_len; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, old_path, old_path_len), old_path_len); - IO_CHECK(write(fio_stdout, new_path, new_path_len), new_path_len); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, old_path, old_path_len), old_path_len); + IO_CHECK(fio_write_all(fio_stdout, new_path, new_path_len), new_path_len); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else @@ -292,10 +476,12 @@ int fio_unlink(char const* path, fio_location location) hdr.handle = -1; hdr.size = path_len; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, path, path_len), path_len); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else @@ -315,10 +501,12 @@ int fio_mkdir(char const* path, int mode, fio_location location) hdr.size = path_len; hdr.arg = mode; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, path, path_len), path_len); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else @@ -338,10 +526,12 @@ int fio_chmod(char const* path, int mode, fio_location location) hdr.size = path_len; hdr.arg = mode; - IO_CHECK(write(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(write(fio_stdout, path, path_len), path_len); + SYS_CHECK(pthread_mutex_lock(&fio_write_mutex)); - SYS_CHECK(pthread_mutex_unlock(&fio_mutex)); + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len); + + SYS_CHECK(pthread_mutex_unlock(&fio_write_mutex)); return 0; } else @@ -356,22 +546,22 @@ static void fio_send_file(int out, char const* path) fio_header hdr; void* buf = NULL; + hdr.cop = FIO_SEND; hdr.size = 0; - hdr.cop = FIO_READ; if (fd >= 0) { off_t size = lseek(fd, 0, SEEK_END); buf = malloc(size); lseek(fd, 0, SEEK_SET); - IO_CHECK(fio_read(fd, buf, size), size); + IO_CHECK(fio_read_all(fd, buf, size), size); hdr.size = size; SYS_CHECK(close(fd)); } - IO_CHECK(write(out, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); if (buf) { - IO_CHECK(write(out, buf, hdr.size), hdr.size); + IO_CHECK(fio_write_all(out, buf, hdr.size), hdr.size); free(buf); } } @@ -379,30 +569,56 @@ static void fio_send_file(int out, char const* path) void fio_communicate(int in, int out) { int fd[FIO_FDMAX]; - char buf[BLCKSZ*2]; /* need more space for page header */ + size_t buf_size = 128*1024; + char* buf = (char*)malloc(buf_size); fio_header hdr; int rc; - while ((rc = read(in, &hdr, sizeof hdr)) == sizeof(hdr)) { + while ((rc = fio_read_all(in, &hdr, sizeof hdr)) == sizeof(hdr)) { if (hdr.size != 0) { - Assert(hdr.size < sizeof(buf)); - IO_CHECK(fio_read(in, buf, hdr.size), hdr.size); + if (hdr.size > buf_size) { + buf_size = hdr.size; + buf = (char*)realloc(buf, buf_size); + } + IO_CHECK(fio_read_all(in, buf, hdr.size), hdr.size); } switch (hdr.cop) { - case FIO_READ: + case FIO_LOAD: fio_send_file(out, buf); break; - case FIO_OPEN_NEW: - SYS_CHECK(fd[hdr.handle] = open(buf, O_RDWR|O_CREAT|O_TRUNC, 0777)); - break; - case FIO_OPEN_EXISTED: - SYS_CHECK(fd[hdr.handle] = open(buf, O_RDWR|O_CREAT, 0777)); + case FIO_OPEN: + SYS_CHECK(fd[hdr.handle] = open(buf, hdr.arg, FILE_PERMISSIONS)); break; case FIO_CLOSE: SYS_CHECK(close(fd[hdr.handle])); break; case FIO_WRITE: - IO_CHECK(write(fd[hdr.handle], buf, hdr.size), hdr.size); + IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size); + break; + case FIO_READ: + if ((size_t)hdr.arg > buf_size) { + buf_size = hdr.arg; + buf = (char*)realloc(buf, buf_size); + } + rc = read(fd[hdr.handle], buf, hdr.arg); + hdr.cop = FIO_SEND; + hdr.size = rc > 0 ? rc : 0; + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(out, buf, rc), rc); + break; + case FIO_STAT: + { + struct stat st; + hdr.size = sizeof(st); + hdr.arg = fstat(fd[hdr.handle], &st); + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(out, &st, sizeof(st)), sizeof(st)); + break; + } + case FIO_ACCESS: + hdr.size = 0; + hdr.arg = access(buf, hdr.arg); + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); break; case FIO_RENAME: SYS_CHECK(rename(buf, buf + strlen(buf))); @@ -426,7 +642,7 @@ void fio_communicate(int in, int out) Assert(false); } } - + free(buf); if (rc != 0) { perror("read"); exit(EXIT_FAILURE); diff --git a/src/utils/file.h b/src/utils/file.h index c5ec6118..3b7cc112 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -5,8 +5,7 @@ typedef enum { - FIO_OPEN_NEW, - FIO_OPEN_EXISTED, + FIO_OPEN, FIO_CLOSE, FIO_WRITE, FIO_RENAME, @@ -15,7 +14,11 @@ typedef enum FIO_CHMOD, FIO_SEEK, FIO_TRUNCATE, - FIO_READ + FIO_READ, + FIO_LOAD, + FIO_STAT, + FIO_SEND, + FIO_ACCESS } fio_operations; typedef enum @@ -25,9 +28,10 @@ typedef enum } fio_location; #define FIO_FDMAX 64 +#define FIO_PIPE_MARKER 0x40000000 #define SYS_CHECK(cmd) do if ((cmd) < 0) { perror(#cmd); exit(EXIT_FAILURE); } while (0) -#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "Receive %d bytes instead of %d\n", _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0) +#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d\n", __FILE__, __LINE__, _rc, (int)(size)); exit(EXIT_FAILURE); } } while (0) typedef struct { unsigned cop : 4; @@ -36,20 +40,32 @@ typedef struct { unsigned arg; } fio_header; -extern void fio_redirect(int in, int out); -extern void fio_communicate(int in, int out); +extern void fio_redirect(int in, int out); +extern void fio_communicate(int in, int out); -extern FILE* fio_open(char const* name, char const* mode, fio_location location); -extern int fio_close(FILE* f); -extern size_t fio_write(FILE* f, void const* buf, size_t size); -extern int fio_printf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3))); -extern int fio_flush(FILE* f); -extern int fio_seek(FILE* f, off_t offs); -extern int fio_truncate(FILE* f, off_t size); -extern int fio_rename(char const* old_path, char const* new_path, fio_location location); -extern int fio_unlink(char const* path, fio_location location); -extern int fio_mkdir(char const* path, int mode, fio_location location); -extern int fio_chmod(char const* path, int mode, fio_location location); +extern FILE* fio_fopen(char const* name, char const* mode, fio_location location); +extern size_t fio_fwrite(FILE* f, void const* buf, size_t size); +extern size_t fio_fread(FILE* f, void* buf, size_t size); +extern int fio_fprintf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3))); +extern int fio_fflush(FILE* f); +extern int fio_fseek(FILE* f, off_t offs); +extern int fio_ftruncate(FILE* f, off_t size); +extern int fio_fclose(FILE* f); + +extern int fio_open(char const* name, int mode, fio_location location); +extern ssize_t fio_write(int fd, void const* buf, size_t size); +extern ssize_t fio_read(int fd, void* buf, size_t size); +extern int fio_flush(int fd); +extern int fio_seek(int fd, off_t offs); +extern int fio_stat(int fd, struct stat* st); +extern int fio_truncate(int fd, off_t size); +extern int fio_close(int fd); + +extern int fio_rename(char const* old_path, char const* new_path, fio_location location); +extern int fio_unlink(char const* path, fio_location location); +extern int fio_mkdir(char const* path, int mode, fio_location location); +extern int fio_chmod(char const* path, int mode, fio_location location); +extern int fio_access(char const* path, int mode, fio_location location); extern FILE* fio_open_stream(char const* name, fio_location location); extern int fio_close_stream(FILE* f); diff --git a/src/utils/pgut.c b/src/utils/pgut.c index ba4b077e..0c58ae26 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -2164,23 +2164,6 @@ pgut_strdup(const char *str) return ret; } -FILE * -pgut_fopen(const char *path, const char *mode, bool missing_ok) -{ - FILE *fp; - - if ((fp = fopen(path, mode)) == NULL) - { - if (missing_ok && errno == ENOENT) - return NULL; - - elog(ERROR, "could not open file \"%s\": %s", - path, strerror(errno)); - } - - return fp; -} - #ifdef WIN32 static int select_win32(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, const struct timeval * timeout); #define select select_win32 diff --git a/src/utils/pgut.h b/src/utils/pgut.h index 9aac75ca..8150a27b 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -132,11 +132,6 @@ extern char *pgut_strdup(const char *str); #define pgut_new(type) ((type *) pgut_malloc(sizeof(type))) #define pgut_newarray(type, n) ((type *) pgut_malloc(sizeof(type) * (n))) -/* - * file operations - */ -extern FILE *pgut_fopen(const char *path, const char *mode, bool missing_ok); - /* * Assert */ diff --git a/src/walmethods.c b/src/walmethods.c new file mode 100644 index 00000000..d384b99f --- /dev/null +++ b/src/walmethods.c @@ -0,0 +1,384 @@ +/*------------------------------------------------------------------------- + * + * walmethods.c - implementations of different ways to write received wal + * + * NOTE! The caller must ensure that only one method is instantiated in + * any given program, and that it's only instantiated once! + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/walmethods.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include +#include +#include +#ifdef HAVE_LIBZ +#include +#endif + +#include "pgtar.h" +#include "common/file_perm.h" +#include "common/file_utils.h" + +#include "receivelog.h" +#include "streamutil.h" +#include "pg_probackup.h" + +/* Size of zlib buffer for .tar.gz */ +#define ZLIB_OUT_SIZE 4096 + +/*------------------------------------------------------------------------- + * WalDirectoryMethod - write wal to a directory looking like pg_wal + *------------------------------------------------------------------------- + */ + +/* + * Global static data for this method + */ +typedef struct DirectoryMethodData +{ + char *basedir; + int compression; + bool sync; +} DirectoryMethodData; +static DirectoryMethodData *dir_data = NULL; + +/* + * Local file handle + */ +typedef struct DirectoryMethodFile +{ + int fd; + off_t currpos; + char *pathname; + char *fullpath; + char *temp_suffix; +#ifdef HAVE_LIBZ + gzFile gzfp; +#endif +} DirectoryMethodFile; + +static const char * +dir_getlasterror(void) +{ + /* Directory method always sets errno, so just use strerror */ + return strerror(errno); +} + +static Walfile +dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) +{ + static char tmppath[MAXPGPATH]; + int fd; + DirectoryMethodFile *f; +#ifdef HAVE_LIBZ + gzFile gzfp = NULL; +#endif + + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, pathname, + dir_data->compression > 0 ? ".gz" : "", + temp_suffix ? temp_suffix : ""); + + /* + * Open a file for non-compressed as well as compressed files. Tracking + * the file descriptor is important for dir_sync() method as gzflush() + * does not do any system calls to fsync() to make changes permanent on + * disk. + */ + fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST); + if (fd < 0) + return NULL; + +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + { + gzfp = gzdopen(fd, "wb"); + if (gzfp == NULL) + { + close(fd); + return NULL; + } + + if (gzsetparams(gzfp, dir_data->compression, + Z_DEFAULT_STRATEGY) != Z_OK) + { + gzclose(gzfp); + return NULL; + } + } +#endif + + /* Do pre-padding on non-compressed files */ + if (pad_to_size && dir_data->compression == 0) + { + PGAlignedXLogBlock zerobuf; + int bytes; + + memset(zerobuf.data, 0, XLOG_BLCKSZ); + for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ) + { + errno = 0; + if (fio_write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) + { + int save_errno = errno; + + fio_close(fd); + + /* + * If write didn't set errno, assume problem is no disk space. + */ + errno = save_errno ? save_errno : ENOSPC; + return NULL; + } + } + + if (fio_seek(fd, 0) != 0) + { + int save_errno = errno; + + fio_close(fd); + errno = save_errno; + return NULL; + } + } + + /* + * fsync WAL file and containing directory, to ensure the file is + * persistently created and zeroed (if padded). That's particularly + * important when using synchronous mode, where the file is modified and + * fsynced in-place, without a directory fsync. + */ + if (!is_remote_agent && dir_data->sync) + { + if (fsync_fname(tmppath, false, progname) != 0 || + fsync_parent_path(tmppath, progname) != 0) + { +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + gzclose(gzfp); + else +#endif + close(fd); + return NULL; + } + } + + f = pg_malloc0(sizeof(DirectoryMethodFile)); +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + f->gzfp = gzfp; +#endif + f->fd = fd; + f->currpos = 0; + f->pathname = pg_strdup(pathname); + f->fullpath = pg_strdup(tmppath); + if (temp_suffix) + f->temp_suffix = pg_strdup(temp_suffix); + + return f; +} + +static ssize_t +dir_write(Walfile f, const void *buf, size_t count) +{ + ssize_t r; + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + + Assert(f != NULL); + +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + r = (ssize_t) gzwrite(df->gzfp, buf, count); + else +#endif + r = fio_write(df->fd, buf, count); + if (r > 0) + df->currpos += r; + return r; +} + +static off_t +dir_get_current_pos(Walfile f) +{ + Assert(f != NULL); + + /* Use a cached value to prevent lots of reseeks */ + return ((DirectoryMethodFile *) f)->currpos; +} + +static int +dir_close(Walfile f, WalCloseMethod method) +{ + int r; + DirectoryMethodFile *df = (DirectoryMethodFile *) f; + static char tmppath[MAXPGPATH]; + static char tmppath2[MAXPGPATH]; + + Assert(f != NULL); + +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + r = gzclose(df->gzfp); + else +#endif + r = fio_close(df->fd); + + if (r == 0) + { + /* Build path to the current version of the file */ + if (method == CLOSE_NORMAL && df->temp_suffix) + { + /* + * If we have a temp prefix, normal operation is to rename the + * file. + */ + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : "", + df->temp_suffix); + snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : ""); + r = durable_rename(tmppath, tmppath2, progname); + } + else if (method == CLOSE_UNLINK) + { + /* Unlink the file once it's closed */ + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : "", + df->temp_suffix ? df->temp_suffix : ""); + r = unlink(tmppath); + } + else + { + /* + * Else either CLOSE_NORMAL and no temp suffix, or + * CLOSE_NO_RENAME. In this case, fsync the file and containing + * directory if sync mode is requested. + */ + if (dir_data->sync && !is_remote_agent) + { + r = fsync_fname(df->fullpath, false, progname); + if (r == 0) + r = fsync_parent_path(df->fullpath, progname); + } + } + } + + pg_free(df->pathname); + pg_free(df->fullpath); + if (df->temp_suffix) + pg_free(df->temp_suffix); + pg_free(df); + + return r; +} + +static int +dir_sync(Walfile f) +{ + Assert(f != NULL); + + if (!dir_data->sync) + return 0; + +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + { + if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) + return -1; + } +#endif + + return fio_flush(((DirectoryMethodFile *) f)->fd); +} + +static ssize_t +dir_get_file_size(const char *pathname) +{ + struct stat statbuf; + static char tmppath[MAXPGPATH]; + int fd; + + snprintf(tmppath, sizeof(tmppath), "%s/%s", + dir_data->basedir, pathname); + + fd = fio_open(tmppath, O_RDONLY|PG_BINARY, FIO_BACKUP_HOST); + if (fd >= 0) + { + if (fio_stat(fd, &statbuf) != 0) + { + fio_close(fd); + return -1; + } + fio_close(fd); + return statbuf.st_size; + } + return -1; +} + +static bool +dir_existsfile(const char *pathname) +{ + static char tmppath[MAXPGPATH]; + + snprintf(tmppath, sizeof(tmppath), "%s/%s", + dir_data->basedir, pathname); + + return fio_access(tmppath, F_OK, FIO_BACKUP_HOST) == 0; +} + +static bool +dir_finish(void) +{ + if (dir_data->sync && !is_remote_agent) + { + /* + * Files are fsynced when they are closed, but we need to fsync the + * directory entry here as well. + */ + if (fsync_fname(dir_data->basedir, true, progname) != 0) + return false; + } + return true; +} + + +WalWriteMethod * +CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) +{ + WalWriteMethod *method; + + method = pg_malloc0(sizeof(WalWriteMethod)); + method->open_for_write = dir_open_for_write; + method->write = dir_write; + method->get_current_pos = dir_get_current_pos; + method->get_file_size = dir_get_file_size; + method->close = dir_close; + method->sync = dir_sync; + method->existsfile = dir_existsfile; + method->finish = dir_finish; + method->getlasterror = dir_getlasterror; + + dir_data = pg_malloc0(sizeof(DirectoryMethodData)); + dir_data->compression = compression; + dir_data->basedir = pg_strdup(basedir); + dir_data->sync = sync; + + return method; +} + +void +FreeWalDirectoryMethod(void) +{ + pg_free(dir_data->basedir); + pg_free(dir_data); +} + diff --git a/src/walmethods.h b/src/walmethods.h new file mode 100644 index 00000000..76eb249b --- /dev/null +++ b/src/walmethods.h @@ -0,0 +1,92 @@ +/*------------------------------------------------------------------------- + * + * walmethods.h + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/walmethods.h + *------------------------------------------------------------------------- + */ + + +typedef void *Walfile; + +typedef enum +{ + CLOSE_NORMAL, + CLOSE_UNLINK, + CLOSE_NO_RENAME +} WalCloseMethod; + +/* + * A WalWriteMethod structure represents the different methods used + * to write the streaming WAL as it's received. + * + * All methods that have a failure return indicator will set state + * allowing the getlasterror() method to return a suitable message. + * Commonly, errno is this state (or part of it); so callers must take + * care not to clobber errno between a failed method call and use of + * getlasterror() to retrieve the message. + */ +typedef struct WalWriteMethod WalWriteMethod; +struct WalWriteMethod +{ + /* + * Open a target file. Returns Walfile, or NULL if open failed. If a temp + * suffix is specified, a file with that name will be opened, and then + * automatically renamed in close(). If pad_to_size is specified, the file + * will be padded with NUL up to that size, if supported by the Walmethod. + */ + Walfile (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size); + + /* + * Close an open Walfile, using one or more methods for handling automatic + * unlinking etc. Returns 0 on success, other values for error. + */ + int (*close) (Walfile f, WalCloseMethod method); + + /* Check if a file exist */ + bool (*existsfile) (const char *pathname); + + /* Return the size of a file, or -1 on failure. */ + ssize_t (*get_file_size) (const char *pathname); + + /* + * Write count number of bytes to the file, and return the number of bytes + * actually written or -1 for error. + */ + ssize_t (*write) (Walfile f, const void *buf, size_t count); + + /* Return the current position in a file or -1 on error */ + off_t (*get_current_pos) (Walfile f); + + /* + * fsync the contents of the specified file. Returns 0 on success. + */ + int (*sync) (Walfile f); + + /* + * Clean up the Walmethod, closing any shared resources. For methods like + * tar, this includes writing updated headers. Returns true if the + * close/write/sync of shared resources succeeded, otherwise returns false + * (but the resources are still closed). + */ + bool (*finish) (void); + + /* Return a text for the last error in this Walfile */ + const char *(*getlasterror) (void); +}; + +/* + * Available WAL methods: + * - WalDirectoryMethod - write WAL to regular files in a standard pg_wal + * - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_wal + * (only implements the methods required for pg_basebackup, + * not all those required for pg_receivewal) + */ +WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, + int compression, bool sync); + +/* Cleanup routines for previously-created methods */ +void FreeWalDirectoryMethod(void);