1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-17 21:18:00 +02:00

use dynamically allocated buffers in archive-push and archive-get

This commit is contained in:
Grigory Smolkin 2020-04-24 17:37:16 +03:00
parent a9aab29a04
commit b7f8283d48

View File

@ -414,8 +414,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
{
FILE *in = NULL;
int out = -1;
char buf[STDIO_BUFSIZE];
// char buf[XLOG_BLCKSZ];
char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
/* partial handling */
@ -433,11 +432,14 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
canonicalize_path(to_fullpath);
/* Open source file for read */
in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST);
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
elog(ERROR, "Thread [%d]: Cannot open source file \"%s\": %s",
thread_num, from_fullpath, strerror(errno));
/* disable stdio buffering for input file */
setvbuf(in, NULL, _IONBF, BUFSIZ);
/* open destination partial file for write */
snprintf(to_fullpath_part, sizeof(to_fullpath_part), "%s.part", to_fullpath);
@ -542,14 +544,14 @@ part_opened:
pg_crc32 crc32_dst;
crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false);
crc32_dst = fio_get_crc32(to_fullpath, FIO_DB_HOST, false);
crc32_dst = fio_get_crc32(to_fullpath, FIO_BACKUP_HOST, false);
if (crc32_src == crc32_dst)
{
elog(LOG, "Thread [%d]: WAL file already exists in archive with the same "
"checksum, skip pushing: \"%s\"", thread_num, from_fullpath);
/* cleanup */
fio_fclose(in);
fclose(in);
fio_close(out);
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
return 1;
@ -574,11 +576,11 @@ part_opened:
/* copy content */
for (;;)
{
ssize_t read_len = 0;
size_t read_len = 0;
read_len = fio_fread(in, buf, sizeof(buf));
read_len = fread(buf, 1, OUT_BUF_SIZE, in);
if (read_len < 0)
if (ferror(in))
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot read source file \"%s\": %s",
@ -595,17 +597,12 @@ part_opened:
}
}
if (read_len == 0)
if (feof(in))
break;
}
/* close source file */
if (fio_fclose(in))
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot close source WAL file \"%s\": %s",
thread_num, from_fullpath, strerror(errno));
}
fclose(in);
/* close temp file */
if (fio_close(out) != 0)
@ -636,6 +633,7 @@ part_opened:
thread_num, to_fullpath_part, to_fullpath, strerror(errno));
}
pg_free(buf);
return 0;
}
@ -654,8 +652,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
{
FILE *in = NULL;
gzFile out = NULL;
int errno_temp;
char buf[STDIO_BUFSIZE];
char *buf = pgut_malloc(OUT_BUF_SIZE);
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
char to_fullpath_gz[MAXPGPATH];
@ -681,11 +678,14 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
snprintf(to_fullpath_gz_part, sizeof(to_fullpath_gz_part), "%s.part", to_fullpath_gz);
/* Open source file for read */
in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST);
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
elog(ERROR, "Thread [%d]: Cannot open source WAL file \"%s\": %s",
thread_num, from_fullpath, strerror(errno));
/* disable stdio buffering for input file */
setvbuf(in, NULL, _IONBF, BUFSIZ);
/* Grab lock by creating temp file in exclusive mode */
out = fio_gzopen(to_fullpath_gz_part, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
if (out == NULL)
@ -787,16 +787,16 @@ part_opened:
pg_crc32 crc32_src;
pg_crc32 crc32_dst;
/* what if one of them goes missing */
/* TODO: what if one of them goes missing? */
crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false);
crc32_dst = fio_get_crc32(to_fullpath_gz, FIO_DB_HOST, true);
crc32_dst = fio_get_crc32(to_fullpath_gz, FIO_BACKUP_HOST, true);
if (crc32_src == crc32_dst)
{
elog(LOG, "Thread [%d]: WAL file already exists in archive with the same "
"checksum, skip pushing: \"%s\"", thread_num, from_fullpath);
/* cleanup */
fio_fclose(in);
fclose(in);
fio_gzclose(out);
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
return 1;
@ -811,8 +811,6 @@ part_opened:
/* Overwriting is forbidden,
* so we must unlink partial file and exit with error.
*/
fio_fclose(in);
fio_gzclose(out);
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: WAL file already exists in archive with "
"different checksum: \"%s\"", thread_num, to_fullpath_gz);
@ -823,11 +821,11 @@ part_opened:
/* copy content */
for (;;)
{
ssize_t read_len = 0;
size_t read_len = 0;
read_len = fio_fread(in, buf, sizeof(buf));
read_len = fread(buf, 1, OUT_BUF_SIZE, in);
if (read_len < 0)
if (ferror(in))
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot read from source file \"%s\": %s",
@ -838,32 +836,25 @@ part_opened:
{
if (fio_gzwrite(out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot write to compressed temp WAL file \"%s\": %s",
thread_num, to_fullpath_gz_part, get_gz_error(out, errno_temp));
thread_num, to_fullpath_gz_part, get_gz_error(out, errno));
}
}
if (read_len == 0)
if (feof(in))
break;
}
/* close source file */
if (fio_fclose(in))
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot close source WAL file \"%s\": %s",
thread_num, from_fullpath, strerror(errno));
}
fclose(in);
/* close temp file */
if (fio_gzclose(out) != 0)
{
errno_temp = errno;
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
elog(ERROR, "Thread [%d]: Cannot close compressed temp WAL file \"%s\": %s",
thread_num, to_fullpath_gz_part, strerror(errno_temp));
thread_num, to_fullpath_gz_part, strerror(errno));
}
/* sync temp file to disk */
@ -887,6 +878,8 @@ part_opened:
thread_num, to_fullpath_gz_part, to_fullpath_gz, strerror(errno));
}
pg_free(buf);
return 0;
}
#endif
@ -1518,7 +1511,7 @@ get_wal_file(const char *filename, const char *from_fullpath,
}
/*
* Copy local WAL segment with possible decompression.
* Copy WAL segment with possible decompression from local archive.
* Return codes:
* FILE_MISSING (-1)
* OPEN_FAILED (-2)
@ -1608,14 +1601,15 @@ get_wal_file_internal(const char *from_path, const char *to_path, FILE *out,
{
read_len = fread(buf, 1, OUT_BUF_SIZE, in);
if (read_len < 0 || ferror(in))
if (ferror(in))
{
elog(WARNING, "Thread [%d]: Cannot read source WAL file \"%s\": %s",
thread_num, from_path, strerror(errno));
exit_code = READ_FAILED;
break;
}
else if (read_len == 0 && feof(in))
if (read_len == 0 && feof(in))
break;
}