diff --git a/src/backup.c b/src/backup.c index 9a1cd900..0578f210 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1382,8 +1382,8 @@ wait_wal_lsn(XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli, { XLogRecPtr res; - res = get_prior_record_lsn(wal_segment_dir, current.start_lsn, target_lsn, tli, - in_prev_segment, instance_config.xlog_seg_size); + res = get_prior_record_lsn(wal_segment_dir, current.start_lsn, target_lsn, tli, + in_prev_segment, instance_config.xlog_seg_size); if (!XLogRecPtrIsInvalid(res)) { diff --git a/src/parsexlog.c b/src/parsexlog.c index 53597523..38c62c6a 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -846,7 +846,14 @@ get_prior_record_lsn(const char *archivedir, XLogRecPtr start_lsn, */ GetXLogSegNo(start_lsn, start_segno, wal_seg_size); if (start_segno == segno) + { startpoint = start_lsn; +#if PG_VERSION_NUM >= 130000 + if (XLogRecPtrIsInvalid(startpoint)) + startpoint = SizeOfXLogShortPHD; + XLogBeginRead(xlogreader, startpoint); +#endif + } else { XLogRecPtr found; diff --git a/src/stream.c b/src/stream.c index bf4885ab..f47ea9e6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -64,6 +64,9 @@ static int checkpoint_timeout(PGconn *backup_conn); static void *StreamLog(void *arg); static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished); +static void append_wal_segment(parray *filelist, uint32 timeline, + XLogRecPtr xlogpos, char *basedir, + uint32 xlog_seg_size); /* * Run IDENTIFY_SYSTEM through a given connection and @@ -166,6 +169,8 @@ StreamLog(void *arg) */ stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size; + xlog_files_list = parray_new(); + /* Initialize timeout */ stream_stop_begin = 0; @@ -239,6 +244,18 @@ StreamLog(void *arg) elog(ERROR, "Problem in receivexlog"); #endif + /* sort xlog_files_list */ + parray_qsort(xlog_files_list, pgFileCompareRelPathWithExternal); + + append_wal_segment(xlog_files_list, stream_arg->starttli, + stop_stream_lsn, (char *) stream_arg->basedir, + instance_config.xlog_seg_size); + + /* + * TODO: remove redundant WAL segments + * walk pg_wal and remove files with segno greater that of stop_lsn`s segno +1 + */ + elog(LOG, "finished streaming WAL at %X/%X (timeline %u)", (uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, stream_arg->starttli); stream_arg->ret = 0; @@ -275,46 +292,12 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) /* we assume that we get called once at the end of each segment */ if (segment_finished) { - XLogSegNo xlog_segno; - char wal_segment_name[MAXFNAMELEN]; - char wal_segment_relpath[MAXPGPATH]; - char wal_segment_fullpath[MAXPGPATH]; - pgFile *file = NULL; - - elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"), + elog(INFO, _("finished segment at %X/%X (timeline %u)"), (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); - /* Add streamed xlog file into the backup's list of files */ - if (!xlog_files_list) - xlog_files_list = parray_new(); - - GetXLogSegNo(xlogpos, xlog_segno, instance_config.xlog_seg_size); - - /* - * xlogpos points to the current segment, and we need the finished - previous one - * inless xlogpos points to not 0 offset in segment - */ - if (WalSegmentOffset(xlogpos, instance_config.xlog_seg_size) == 0) - xlog_segno--; - - GetXLogFileName(wal_segment_name, timeline, xlog_segno, - instance_config.xlog_seg_size); - - join_path_components(wal_segment_fullpath, - stream_thread_arg.basedir, wal_segment_name); - - join_path_components(wal_segment_relpath, - PG_XLOG_DIR, wal_segment_name); - - /* append file to filelist */ - file = pgFileNew(wal_segment_fullpath, wal_segment_relpath, false, 0, FIO_BACKUP_HOST); - file->name = file->rel_path; - file->crc = pgFileGetCRC(wal_segment_fullpath, true, false); - - /* Should we recheck it using stat? */ - file->write_size = instance_config.xlog_seg_size; - file->uncompressed_size = instance_config.xlog_seg_size; - parray_append(xlog_files_list, file); + append_wal_segment(xlog_files_list, timeline, xlogpos, + (char*) stream_thread_arg.basedir, + instance_config.xlog_seg_size); } /* @@ -400,9 +383,60 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption int wait_WAL_streaming_end(parray *backup_files_list) { - parray_concat(backup_files_list, xlog_files_list); - parray_free(xlog_files_list); + pthread_join(stream_thread, NULL); - pthread_join(stream_thread, NULL); - return stream_thread_arg.ret; + parray_concat(backup_files_list, xlog_files_list); + parray_free(xlog_files_list); + return stream_thread_arg.ret; } + +/* Append streamed WAL segment to filelist */ +void +append_wal_segment(parray *filelist, uint32 timeline, XLogRecPtr xlogpos, char *basedir, uint32 xlog_seg_size) +{ + XLogSegNo xlog_segno; + char wal_segment_name[MAXFNAMELEN]; + char wal_segment_relpath[MAXPGPATH]; + char wal_segment_fullpath[MAXPGPATH]; + pgFile *file = NULL; + + GetXLogSegNo(xlogpos, xlog_segno, xlog_seg_size); + + /* + * xlogpos points to the current segment, and we need the finished - previous one + * inless xlogpos points to not 0 offset in segment + */ + if (WalSegmentOffset(xlogpos, xlog_seg_size) == 0) + xlog_segno--; + + GetXLogFileName(wal_segment_name, timeline, xlog_segno, xlog_seg_size); + + join_path_components(wal_segment_fullpath, basedir, wal_segment_name); + join_path_components(wal_segment_relpath, PG_XLOG_DIR, wal_segment_name); + + file = pgFileNew(wal_segment_fullpath, wal_segment_relpath, false, 0, FIO_BACKUP_HOST); + file->name = file->rel_path; + + /* + * Check if file is already in the list + * stop_lsn segment can be added to this list twice, so + * try not to add duplicates + */ + if (parray_bsearch(filelist, file, pgFileCompareRelPathWithExternal)) + { + if (!parray_rm(filelist, file, pgFileCompareRelPathWithExternal)) + elog(ERROR, "Failed to remove duplicate from array of streamed segments: %s", + file->rel_path); + } + + /* calculate crc */ + file->crc = pgFileGetCRC(wal_segment_fullpath, true, false); + + /* Should we recheck it using stat? */ + file->write_size = xlog_seg_size; + file->uncompressed_size = xlog_seg_size; + + /* append file to filelist */ + elog(VERBOSE, "Append WAL segment: \"%s\"", wal_segment_relpath); + parray_append(filelist, file); +} \ No newline at end of file