diff --git a/src/archive.c b/src/archive.c index 7df3837d..08d7825d 100644 --- a/src/archive.c +++ b/src/archive.c @@ -414,8 +414,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d { FILE *in = NULL; int out = -1; - char buf[STDIO_BUFSIZE]; -// char buf[XLOG_BLCKSZ]; + char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */ char from_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH]; /* partial handling */ @@ -433,11 +432,14 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d canonicalize_path(to_fullpath); /* Open source file for read */ - in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST); + in = fopen(from_fullpath, PG_BINARY_R); if (in == NULL) elog(ERROR, "Thread [%d]: Cannot open source file \"%s\": %s", thread_num, from_fullpath, strerror(errno)); + /* disable stdio buffering for input file */ + setvbuf(in, NULL, _IONBF, BUFSIZ); + /* open destination partial file for write */ snprintf(to_fullpath_part, sizeof(to_fullpath_part), "%s.part", to_fullpath); @@ -542,14 +544,14 @@ part_opened: pg_crc32 crc32_dst; crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false); - crc32_dst = fio_get_crc32(to_fullpath, FIO_DB_HOST, false); + crc32_dst = fio_get_crc32(to_fullpath, FIO_BACKUP_HOST, false); if (crc32_src == crc32_dst) { elog(LOG, "Thread [%d]: WAL file already exists in archive with the same " "checksum, skip pushing: \"%s\"", thread_num, from_fullpath); /* cleanup */ - fio_fclose(in); + fclose(in); fio_close(out); fio_unlink(to_fullpath_part, FIO_BACKUP_HOST); return 1; @@ -574,11 +576,11 @@ part_opened: /* copy content */ for (;;) { - ssize_t read_len = 0; + size_t read_len = 0; - read_len = fio_fread(in, buf, sizeof(buf)); + read_len = fread(buf, 1, OUT_BUF_SIZE, in); - if (read_len < 0) + if (ferror(in)) { fio_unlink(to_fullpath_part, FIO_BACKUP_HOST); elog(ERROR, "Thread [%d]: Cannot read source file \"%s\": %s", @@ -595,17 +597,12 @@ part_opened: } } - if (read_len == 0) + if (feof(in)) break; } /* close source file */ - if (fio_fclose(in)) - { - fio_unlink(to_fullpath_part, FIO_BACKUP_HOST); - elog(ERROR, "Thread [%d]: Cannot close source WAL file \"%s\": %s", - thread_num, from_fullpath, strerror(errno)); - } + fclose(in); /* close temp file */ if (fio_close(out) != 0) @@ -636,6 +633,7 @@ part_opened: thread_num, to_fullpath_part, to_fullpath, strerror(errno)); } + pg_free(buf); return 0; } @@ -654,8 +652,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, { FILE *in = NULL; gzFile out = NULL; - int errno_temp; - char buf[STDIO_BUFSIZE]; + char *buf = pgut_malloc(OUT_BUF_SIZE); char from_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH]; char to_fullpath_gz[MAXPGPATH]; @@ -681,11 +678,14 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir, snprintf(to_fullpath_gz_part, sizeof(to_fullpath_gz_part), "%s.part", to_fullpath_gz); /* Open source file for read */ - in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST); + in = fopen(from_fullpath, PG_BINARY_R); if (in == NULL) elog(ERROR, "Thread [%d]: Cannot open source WAL file \"%s\": %s", thread_num, from_fullpath, strerror(errno)); + /* disable stdio buffering for input file */ + setvbuf(in, NULL, _IONBF, BUFSIZ); + /* Grab lock by creating temp file in exclusive mode */ out = fio_gzopen(to_fullpath_gz_part, PG_BINARY_W, compress_level, FIO_BACKUP_HOST); if (out == NULL) @@ -787,16 +787,16 @@ part_opened: pg_crc32 crc32_src; pg_crc32 crc32_dst; - /* what if one of them goes missing */ + /* TODO: what if one of them goes missing? */ crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false); - crc32_dst = fio_get_crc32(to_fullpath_gz, FIO_DB_HOST, true); + crc32_dst = fio_get_crc32(to_fullpath_gz, FIO_BACKUP_HOST, true); if (crc32_src == crc32_dst) { elog(LOG, "Thread [%d]: WAL file already exists in archive with the same " "checksum, skip pushing: \"%s\"", thread_num, from_fullpath); /* cleanup */ - fio_fclose(in); + fclose(in); fio_gzclose(out); fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); return 1; @@ -811,8 +811,6 @@ part_opened: /* Overwriting is forbidden, * so we must unlink partial file and exit with error. */ - fio_fclose(in); - fio_gzclose(out); fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); elog(ERROR, "Thread [%d]: WAL file already exists in archive with " "different checksum: \"%s\"", thread_num, to_fullpath_gz); @@ -823,11 +821,11 @@ part_opened: /* copy content */ for (;;) { - ssize_t read_len = 0; + size_t read_len = 0; - read_len = fio_fread(in, buf, sizeof(buf)); + read_len = fread(buf, 1, OUT_BUF_SIZE, in); - if (read_len < 0) + if (ferror(in)) { fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); elog(ERROR, "Thread [%d]: Cannot read from source file \"%s\": %s", @@ -838,32 +836,25 @@ part_opened: { if (fio_gzwrite(out, buf, read_len) != read_len) { - errno_temp = errno; fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); elog(ERROR, "Thread [%d]: Cannot write to compressed temp WAL file \"%s\": %s", - thread_num, to_fullpath_gz_part, get_gz_error(out, errno_temp)); + thread_num, to_fullpath_gz_part, get_gz_error(out, errno)); } } - if (read_len == 0) + if (feof(in)) break; } /* close source file */ - if (fio_fclose(in)) - { - fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); - elog(ERROR, "Thread [%d]: Cannot close source WAL file \"%s\": %s", - thread_num, from_fullpath, strerror(errno)); - } + fclose(in); /* close temp file */ if (fio_gzclose(out) != 0) { - errno_temp = errno; fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST); elog(ERROR, "Thread [%d]: Cannot close compressed temp WAL file \"%s\": %s", - thread_num, to_fullpath_gz_part, strerror(errno_temp)); + thread_num, to_fullpath_gz_part, strerror(errno)); } /* sync temp file to disk */ @@ -887,6 +878,8 @@ part_opened: thread_num, to_fullpath_gz_part, to_fullpath_gz, strerror(errno)); } + pg_free(buf); + return 0; } #endif @@ -1518,7 +1511,7 @@ get_wal_file(const char *filename, const char *from_fullpath, } /* - * Copy local WAL segment with possible decompression. + * Copy WAL segment with possible decompression from local archive. * Return codes: * FILE_MISSING (-1) * OPEN_FAILED (-2) @@ -1608,14 +1601,15 @@ get_wal_file_internal(const char *from_path, const char *to_path, FILE *out, { read_len = fread(buf, 1, OUT_BUF_SIZE, in); - if (read_len < 0 || ferror(in)) + if (ferror(in)) { elog(WARNING, "Thread [%d]: Cannot read source WAL file \"%s\": %s", thread_num, from_path, strerror(errno)); exit_code = READ_FAILED; break; } - else if (read_len == 0 && feof(in)) + + if (read_len == 0 && feof(in)) break; }