1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-12 11:45:24 +02:00

Refactor code related to WAL streaming. Optimize filelist gathering for streamed xlog files.

Now streaming thread calculates CRC and adds file info to the filelist after each finished segment.
This commit is contained in:
anastasia 2020-11-11 23:10:17 +03:00
parent dad2747ddc
commit 1d3c333641
3 changed files with 49 additions and 56 deletions

View File

@ -270,6 +270,9 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
pgBackupGetPath(&current, external_prefix, lengthof(external_prefix),
EXTERNAL_DIR);
/* initialize backup's file list */
backup_files_list = parray_new();
/* start stream replication */
if (stream_wal)
{
@ -280,9 +283,6 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
current.start_lsn, current.tli);
}
/* initialize backup list */
backup_files_list = parray_new();
/* list files with the logical path. omit $PGDATA */
if (fio_is_remote(FIO_DB_HOST))
fio_list_dir(backup_files_list, instance_config.pgdata,
@ -567,52 +567,11 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
/* close ssh session in main thread */
fio_disconnect();
/* Add archived xlog files into the list of files of this backup */
if (stream_wal)
{
parray *xlog_files_list;
char pg_xlog_path[MAXPGPATH];
char wal_full_path[MAXPGPATH];
/*
* Add archived xlog files into the list of files of this backup
* NOTHING TO DO HERE
*/
/* Scan backup PG_XLOG_DIR */
xlog_files_list = parray_new();
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
dir_list_file(xlog_files_list, pg_xlog_path, false, true, false, false, true, 0,
FIO_BACKUP_HOST);
/* TODO: Drop streamed WAL segments greater than stop_lsn */
for (i = 0; i < parray_num(xlog_files_list); i++)
{
pgFile *file = (pgFile *) parray_get(xlog_files_list, i);
join_path_components(wal_full_path, pg_xlog_path, file->rel_path);
if (!S_ISREG(file->mode))
continue;
file->crc = pgFileGetCRC(wal_full_path, true, false);
file->write_size = file->size;
/* overwrite rel_path, because now it is relative to
* /backup_dir/backups/instance_name/backup_id/database/pg_xlog/
*/
pg_free(file->rel_path);
/* Now it is relative to /backup_dir/backups/instance_name/backup_id/database/ */
file->rel_path = pgut_strdup(GetRelativePath(wal_full_path, database_path));
file->name = last_dir_separator(file->rel_path);
if (file->name == NULL) // TODO: do it in pgFileInit
file->name = file->rel_path;
else
file->name++;
}
/* Add xlog files into the list of backed up files */
parray_concat(backup_files_list, xlog_files_list);
parray_free(xlog_files_list);
}
/* write database map to file and add it to control file */
if (database_map)
@ -1920,7 +1879,10 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn,
if (stream_wal)
{
wait_WAL_streaming_end();
/* This function will also add list of xlog files
* to the passed filelist */
if(wait_WAL_streaming_end(backup_files_list))
elog(ERROR, "WAL streaming failed");
pgBackupGetPath2(backup, stream_xlog_path,
lengthof(stream_xlog_path),

View File

@ -1177,6 +1177,5 @@ extern XLogRecPtr stop_backup_lsn;
extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path,
ConnectionOptions *conn_opt,
XLogRecPtr startpos, TimeLineID starttli);
extern void wait_WAL_streaming_end(void);
extern int wait_WAL_streaming_end(parray *backup_files_list);
#endif /* PG_PROBACKUP_H */

View File

@ -57,6 +57,8 @@ typedef struct
static pthread_t stream_thread;
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
static parray *xlog_files_list = NULL;
static void IdentifySystem(StreamThreadArg *stream_thread_arg);
static int checkpoint_timeout(PGconn *backup_conn);
static void *StreamLog(void *arg);
@ -273,10 +275,38 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
/* we assume that we get called once at the end of each segment */
if (segment_finished)
{
XLogSegNo xlog_segno;
char wal_segment_name[MAXPGPATH];
char wal_segment_fullpath[MAXPGPATH];
pgFile *file;
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
/* TODO Add streamed file to file list */
/* Add streamed xlog file into the backup's list of files */
if (!xlog_files_list)
xlog_files_list = parray_new();
GetXLogSegNo(xlogpos, xlog_segno, instance_config.xlog_seg_size);
GetXLogFileName(wal_segment_name, timeline, xlog_segno,
instance_config.xlog_seg_size);
join_path_components(wal_segment_fullpath,
stream_thread_arg.basedir, wal_segment_name);
/*
* NOTE We pass wal_segment_name as a relpath, since now we don't have
* any subdirs in wal directory structure
*/
file = pgFileNew(wal_segment_fullpath, wal_segment_name, false, 0,
FIO_BACKUP_HOST);
file->name = file->rel_path;
file->crc = pgFileGetCRC(wal_segment_fullpath, true, false);
/* Should we recheck it using stat? */
file->write_size = instance_config.xlog_seg_size;
file->uncompressed_size = instance_config.xlog_seg_size;
parray_append(xlog_files_list, file);
}
/*
@ -359,10 +389,12 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
}
/* Wait for the completion of stream */
void
wait_WAL_streaming_end(void)
int
wait_WAL_streaming_end(parray *backup_files_list)
{
parray_concat(backup_files_list, xlog_files_list);
parray_free(xlog_files_list);
pthread_join(stream_thread, NULL);
if (stream_thread_arg.ret == 1)
elog(ERROR, "WAL streaming failed");
return stream_thread_arg.ret;
}