diff --git a/src/data.c b/src/data.c index e3725395..f5cfeefb 100644 --- a/src/data.c +++ b/src/data.c @@ -989,7 +989,7 @@ copy_meta(const char *from_path, const char *to_path, bool unlink_on_error, fio_ if (stat(from_path, &st) == -1) { if (unlink_on_error) - unlink(to_path); + fio_unlink(to_path, location); elog(ERROR, "Cannot stat file \"%s\": %s", from_path, strerror(errno)); } @@ -997,7 +997,7 @@ copy_meta(const char *from_path, const char *to_path, bool unlink_on_error, fio_ if (fio_chmod(to_path, st.st_mode, location) == -1) { if (unlink_on_error) - unlink(to_path); + fio_unlink(to_path, location); elog(ERROR, "Cannot change mode of file \"%s\": %s", to_path, strerror(errno)); } @@ -1020,6 +1020,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, #ifdef HAVE_LIBZ char gz_to_path[MAXPGPATH]; gzFile gz_out = NULL; + int gz_tmp = -1; #endif /* open file for read */ @@ -1039,7 +1040,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); - gz_out = gzopen(to_path_temp, PG_BINARY_W); + gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, &gz_tmp, FIO_BACKUP_HOST); if (gzsetparams(gz_out, compress_level, Z_DEFAULT_STRATEGY) != Z_OK) elog(ERROR, "Cannot set compression level %d to file \"%s\": %s", compress_level, to_path_temp, get_gz_error(gz_out, errno)); @@ -1070,7 +1071,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, if (ferror(in)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot read source WAL file \"%s\": %s", from_path, strerror(errno_temp)); @@ -1084,7 +1085,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, if (gzwrite(gz_out, buf, read_len) != read_len) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s", to_path_temp, get_gz_error(gz_out, errno_temp)); } @@ -1095,7 +1096,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, if (fio_fwrite(out, buf, read_len) != read_len) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp, strerror(errno_temp)); } @@ -1109,10 +1110,10 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, #ifdef HAVE_LIBZ if (is_compress) { - if (gzclose(gz_out) != 0) + if (fio_gzclose(gz_out, to_path_temp, gz_tmp) != 0) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", to_path_temp, get_gz_error(gz_out, errno_temp)); } @@ -1124,7 +1125,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, fio_fclose(out)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot write WAL file \"%s\": %s", to_path_temp, strerror(errno_temp)); } @@ -1133,7 +1134,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, if (fclose(in)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot close source WAL file \"%s\": %s", from_path, strerror(errno_temp)); } @@ -1144,7 +1145,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_BACKUP_HOST); elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", to_path_temp, to_path_p, strerror(errno_temp)); } @@ -1229,7 +1230,7 @@ get_wal_file(const char *from_path, const char *to_path) if (read_len != sizeof(buf) && !gzeof(gz_in)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot read compressed WAL file \"%s\": %s", gz_from_path, get_gz_error(gz_in, errno_temp)); } @@ -1241,7 +1242,7 @@ get_wal_file(const char *from_path, const char *to_path) if (ferror(in)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot read source WAL file \"%s\": %s", from_path, strerror(errno_temp)); } @@ -1252,7 +1253,7 @@ get_wal_file(const char *from_path, const char *to_path) if (fio_fwrite(out, buf, read_len) != read_len) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp, strerror(errno_temp)); } @@ -1277,7 +1278,7 @@ get_wal_file(const char *from_path, const char *to_path) fio_fclose(out)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot write WAL file \"%s\": %s", to_path_temp, strerror(errno_temp)); } @@ -1288,7 +1289,7 @@ get_wal_file(const char *from_path, const char *to_path) if (gzclose(gz_in) != 0) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", gz_from_path, get_gz_error(gz_in, errno_temp)); } @@ -1299,7 +1300,7 @@ get_wal_file(const char *from_path, const char *to_path) if (fclose(in)) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot close source WAL file \"%s\": %s", from_path, strerror(errno_temp)); } @@ -1311,7 +1312,7 @@ get_wal_file(const char *from_path, const char *to_path) if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0) { errno_temp = errno; - unlink(to_path_temp); + fio_unlink(to_path_temp, FIO_DB_HOST); elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", to_path_temp, to_path, strerror(errno_temp)); } diff --git a/src/parsexlog.c b/src/parsexlog.c index 0eb4d1fb..3c34d506 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -102,6 +102,7 @@ typedef struct XLogPageReadPrivate #ifdef HAVE_LIBZ gzFile gz_xlogfile; char gz_xlogpath[MAXPGPATH]; + int gz_tmp; #endif } XLogPageReadPrivate; @@ -885,8 +886,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, private_data->thread_num, private_data->gz_xlogpath); private_data->xlogexists = true; - private_data->gz_xlogfile = gzopen(private_data->gz_xlogpath, - "rb"); + private_data->gz_xlogfile = fio_gzopen(private_data->gz_xlogpath, + "rb", &private_data->gz_tmp, private_data->location); if (private_data->gz_xlogfile == NULL) { elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s", @@ -1000,7 +1001,7 @@ CleanupXLogPageRead(XLogReaderState *xlogreader) #ifdef HAVE_LIBZ else if (private_data->gz_xlogfile != NULL) { - gzclose(private_data->gz_xlogfile); + fio_gzclose(private_data->gz_xlogfile, private_data->gz_xlogpath, private_data->gz_tmp); private_data->gz_xlogfile = NULL; } #endif diff --git a/src/utils/file.c b/src/utils/file.c index cbbd0aa3..ad69a60c 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -36,7 +36,8 @@ static bool fio_is_remote_fd(int fd) static bool fio_is_remote(fio_location location) { - return (location == FIO_BACKUP_HOST && is_remote_agent) + return location == FIO_REMOTE_HOST + || (location == FIO_BACKUP_HOST && is_remote_agent) || (location == FIO_DB_HOST && !is_remote_agent && ssh_host != NULL); } @@ -581,6 +582,61 @@ int fio_chmod(char const* path, int mode, fio_location location) } } +#ifdef HAVE_LIBZ +gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location) +{ + gzFile file; + if (fio_is_remote(location)) + { + int fd = mkstemp("gz.XXXXXX"); + if (fd < 0) + return NULL; + *tmp_fd = fd; + file = gzdopen(fd, mode); + } + else + { + *tmp_fd = -1; + file = gzopen(path, mode); + } + return file; +} + +int fio_gzclose(gzFile file, char const* path, int tmp_fd) +{ + if (tmp_fd >= 0) + { + off_t size; + void* buf; + int fd; + + SYS_CHECK(gzflush(file, Z_FINISH)); + + size = lseek(tmp_fd, 0, SEEK_END); + buf = malloc(size); + + lseek(tmp_fd, 0, SEEK_SET); + IO_CHECK(read(tmp_fd, buf, size), size); + + SYS_CHECK(gzclose(file)); /* should close tmp_fd */ + + fd = fio_open(path, O_RDWR|O_CREAT|O_TRUNC, FILE_PERMISSIONS); + if (fd < 0) { + free(buf); + return -1; + } + IO_CHECK(fio_write(fd, buf, size), size); + free(buf); + return fio_close(fd); + } + else + { + return gzclose(file); + } +} +#endif + + static void fio_send_file(int out, char const* path) { int fd = open(path, O_RDONLY); diff --git a/src/utils/file.h b/src/utils/file.h index 123167a7..875f0c32 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -3,6 +3,10 @@ #include +#ifdef HAVE_LIBZ +#include +#endif + typedef enum { FIO_OPEN, @@ -27,7 +31,8 @@ typedef enum { FIO_LOCAL_HOST, FIO_DB_HOST, - FIO_BACKUP_HOST + FIO_BACKUP_HOST, + FIO_REMOTE_HOST } fio_location; #define FIO_FDMAX 64 @@ -82,6 +87,11 @@ extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, extern FILE* fio_open_stream(char const* name, fio_location location); extern int fio_close_stream(FILE* f); +#ifdef HAVE_LIBZ +extern gzFile fio_gzopen(char const* path, char const* mode, int* tmp_fd, fio_location location); +extern int fio_gzclose(gzFile file, char const* path, int tmp_fd); +#endif + extern void fio_transfer(void* addr, size_t value); #endif diff --git a/src/walmethods.c b/src/walmethods.c index 39f4f990..802a8e9c 100644 --- a/src/walmethods.c +++ b/src/walmethods.c @@ -61,6 +61,7 @@ typedef struct DirectoryMethodFile char *temp_suffix; #ifdef HAVE_LIBZ gzFile gzfp; + int gz_tmp; #endif } DirectoryMethodFile; @@ -74,11 +75,12 @@ dir_getlasterror(void) static Walfile dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size) { - static char tmppath[MAXPGPATH]; - int fd; + char tmppath[MAXPGPATH]; + int fd = -1; DirectoryMethodFile *f; #ifdef HAVE_LIBZ gzFile gzfp = NULL; + int gz_tmp = -1; #endif snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", @@ -92,17 +94,12 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ * does not do any system calls to fsync() to make changes permanent on * disk. */ - fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST); - if (fd < 0) - return NULL; - #ifdef HAVE_LIBZ if (dir_data->compression > 0) { - gzfp = gzdopen(fd, "wb"); + gzfp = fio_gzopen(tmppath, "wb", &gz_tmp, FIO_BACKUP_HOST); if (gzfp == NULL) { - close(fd); return NULL; } @@ -113,8 +110,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ return NULL; } } + else #endif - + { + fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST); + if (fd < 0) + return NULL; + } /* Do pre-padding on non-compressed files */ if (pad_to_size && dir_data->compression == 0) { @@ -173,7 +175,10 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ f = pg_malloc0(sizeof(DirectoryMethodFile)); #ifdef HAVE_LIBZ if (dir_data->compression > 0) + { f->gzfp = gzfp; + f->gz_tmp = gz_tmp; + } #endif f->fd = fd; f->currpos = 0; @@ -218,14 +223,14 @@ dir_close(Walfile f, WalCloseMethod method) { int r; DirectoryMethodFile *df = (DirectoryMethodFile *) f; - static char tmppath[MAXPGPATH]; - static char tmppath2[MAXPGPATH]; + char tmppath[MAXPGPATH]; + char tmppath2[MAXPGPATH]; Assert(f != NULL); #ifdef HAVE_LIBZ if (dir_data->compression > 0) - r = gzclose(df->gzfp); + r = fio_gzclose(df->gzfp, df->fullpath, df->gz_tmp); else #endif r = fio_close(df->fd); @@ -248,7 +253,7 @@ dir_close(Walfile f, WalCloseMethod method) snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s", dir_data->basedir, df->pathname, dir_data->compression > 0 ? ".gz" : ""); - r = durable_rename(tmppath, tmppath2, progname); + r = fio_rename(tmppath, tmppath2, FIO_BACKUP_HOST); file_path = tmppath2; } else if (method == CLOSE_UNLINK) @@ -258,7 +263,7 @@ dir_close(Walfile f, WalCloseMethod method) dir_data->basedir, df->pathname, dir_data->compression > 0 ? ".gz" : "", df->temp_suffix ? df->temp_suffix : ""); - r = unlink(tmppath); + r = fio_unlink(tmppath, FIO_BACKUP_HOST); } else { @@ -267,7 +272,7 @@ dir_close(Walfile f, WalCloseMethod method) * CLOSE_NO_RENAME. In this case, fsync the file and containing * directory if sync mode is requested. */ - file_path = df->fullpath; + file_path = df->fullpath; if (dir_data->sync && !is_remote_agent) { r = fsync_fname(df->fullpath, false, progname); @@ -315,7 +320,7 @@ static ssize_t dir_get_file_size(const char *pathname) { struct stat statbuf; - static char tmppath[MAXPGPATH]; + char tmppath[MAXPGPATH]; snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, pathname); @@ -329,7 +334,7 @@ dir_get_file_size(const char *pathname) static bool dir_existsfile(const char *pathname) { - static char tmppath[MAXPGPATH]; + char tmppath[MAXPGPATH]; snprintf(tmppath, sizeof(tmppath), "%s/%s", dir_data->basedir, pathname);