1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-01 13:27:47 +02:00

update backup_content.control during backup;

implement buffered write of backup_content.control
This commit is contained in:
Anastasia 2019-06-07 22:22:26 +03:00
parent e66d96ac16
commit 66a1c643d5
3 changed files with 98 additions and 51 deletions

View File

@ -462,6 +462,7 @@ do_backup_instance(PGconn *backup_conn)
arg->prev_start_lsn = prev_backup_start_lsn;
arg->conn_arg.conn = NULL;
arg->conn_arg.cancel_conn = NULL;
arg->thread_num = i+1;
/* By default there are some error */
arg->ret = 1;
}
@ -1987,6 +1988,9 @@ backup_files(void *arg)
int i;
backup_files_arg *arguments = (backup_files_arg *) arg;
int n_backup_files_list = parray_num(arguments->files_list);
static time_t prev_time;
prev_time = current.start_time;
/* backup a file */
for (i = 0; i < n_backup_files_list; i++)
@ -1995,6 +1999,21 @@ backup_files(void *arg)
struct stat buf;
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
if (arguments->thread_num == 1)
{
/* update every 10 seconds */
if ((difftime(time(NULL), prev_time)) > 10)
{
prev_time = time(NULL);
elog(INFO, "write_backup_filelist N=%ld, starttime %ld, time %ld",
parray_num(backup_files_list), current.start_time, prev_time);
write_backup_filelist(&current, backup_files_list, instance_config.pgdata,
NULL, arguments->external_dirs);
}
}
if (!pg_atomic_test_set_flag(&file->lock))
continue;
elog(VERBOSE, "Copying file: \"%s\" ", file->path);

View File

@ -632,22 +632,96 @@ void
write_backup_filelist(pgBackup *backup, parray *files, const char *root,
const char *external_prefix, parray *external_list)
{
FILE *fp;
FILE *out;
char path[MAXPGPATH];
char path_temp[MAXPGPATH];
int errno_temp;
size_t i = 0;
#define BUFFERSZ BLCKSZ*500
char buf[BUFFERSZ];
size_t write_len = 0;
pgBackupGetPath(backup, path, lengthof(path), DATABASE_FILE_LIST);
snprintf(path_temp, sizeof(path_temp), "%s.tmp", path);
fp = fio_fopen(path_temp, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
out = fio_fopen(path_temp, PG_BINARY_W, FIO_BACKUP_HOST);
if (out == NULL)
elog(ERROR, "Cannot open file list \"%s\": %s", path_temp,
strerror(errno));
print_file_list(fp, files, root, external_prefix, external_list);
/* print each file in the list */
while(i < parray_num(files))
{
pgFile *file = (pgFile *) parray_get(files, i);
char *path = file->path;
char line[BLCKSZ];
int len = 0;
if (fio_fflush(fp) || fio_fclose(fp))
/* omit root directory portion */
if (root && strstr(path, root) == path)
path = GetRelativePath(path, root);
else if (file->external_dir_num && !external_prefix)
{
Assert(external_list);
path = GetRelativePath(path, parray_get(external_list,
file->external_dir_num - 1));
}
len = sprintf(line, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", "
"\"mode\":\"%u\", \"is_datafile\":\"%u\", "
"\"is_cfs\":\"%u\", \"crc\":\"%u\", "
"\"compress_alg\":\"%s\", \"external_dir_num\":\"%d\"",
path, file->write_size, file->mode,
file->is_datafile ? 1 : 0,
file->is_cfs ? 1 : 0,
file->crc,
deparse_compress_alg(file->compress_alg),
file->external_dir_num);
if (file->is_datafile)
len += sprintf(line+len, ",\"segno\":\"%d\"", file->segno);
if (file->linked)
len += sprintf(line+len, ",\"linked\":\"%s\"", file->linked);
if (file->n_blocks != BLOCKNUM_INVALID)
len += sprintf(line+len, ",\"n_blocks\":\"%i\"", file->n_blocks);
len += sprintf(line+len, "}\n");
if (write_len + len <= BUFFERSZ)
{
memcpy(buf+write_len, line, len);
write_len += len;
}
else
{
/* write buffer to file */
if (fio_fwrite(out, buf, write_len) != write_len)
{
errno_temp = errno;
fio_unlink(path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write file list \"%s\": %s",
path_temp, strerror(errno));
}
/* reset write_len */
write_len = 0;
}
i++;
}
/* write what is left in the buffer to file */
if (write_len > 0)
if (fio_fwrite(out, buf, write_len) != write_len)
{
errno_temp = errno;
fio_unlink(path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write file list \"%s\": %s",
path_temp, strerror(errno));
}
if (fio_fflush(out) || fio_fclose(out))
{
errno_temp = errno;
fio_unlink(path_temp, FIO_BACKUP_HOST);

View File

@ -1271,52 +1271,6 @@ get_external_remap(char *current_dir)
return current_dir;
}
/*
* Print backup content list.
*/
void
print_file_list(FILE *out, const parray *files, const char *root,
const char *external_prefix, parray *external_list)
{
size_t i;
/* print each file in the list */
for (i = 0; i < parray_num(files); i++)
{
pgFile *file = (pgFile *) parray_get(files, i);
char *path = file->path;
/* omit root directory portion */
if (root && strstr(path, root) == path)
path = GetRelativePath(path, root);
else if (file->external_dir_num && !external_prefix)
{
Assert(external_list);
path = GetRelativePath(path, parray_get(external_list,
file->external_dir_num - 1));
}
fio_fprintf(out, "{\"path\":\"%s\", \"size\":\"" INT64_FORMAT "\", "
"\"mode\":\"%u\", \"is_datafile\":\"%u\", "
"\"is_cfs\":\"%u\", \"crc\":\"%u\", "
"\"compress_alg\":\"%s\", \"external_dir_num\":\"%d\"",
path, file->write_size, file->mode,
file->is_datafile ? 1 : 0, file->is_cfs ? 1 : 0, file->crc,
deparse_compress_alg(file->compress_alg), file->external_dir_num);
if (file->is_datafile)
fio_fprintf(out, ",\"segno\":\"%d\"", file->segno);
if (file->linked)
fio_fprintf(out, ",\"linked\":\"%s\"", file->linked);
if (file->n_blocks != BLOCKNUM_INVALID)
fio_fprintf(out, ",\"n_blocks\":\"%i\"", file->n_blocks);
fio_fprintf(out, "}\n");
}
}
/* Parsing states for get_control_value() */
#define CONTROL_WAIT_NAME 1
#define CONTROL_INNAME 2