diff --git a/src/parsexlog.c b/src/parsexlog.c index 9411d291..5a6ed130 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -86,6 +86,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime typedef struct XLogPageReadPrivate { + int thread_num; const char *archivedir; TimeLineID tli; @@ -106,7 +107,6 @@ typedef struct XLogPageReadPrivate /* An argument for a thread function */ typedef struct { - int thread_num; XLogPageReadPrivate private_data; XLogRecPtr startpoint; @@ -134,6 +134,55 @@ static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, static XLogSegNo nextSegNoToRead = 0; static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Do manual switch to the next WAL segment. + * + * Returns false if the reader reaches the end of a WAL segment list. + */ +static bool +switchToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg) +{ + XLogPageReadPrivate *private_data; + XLogRecPtr found; + + private_data = (XLogPageReadPrivate *) xlogreader->private_data; + private_data->need_switch = false; + + /* Critical section */ + pthread_lock(&wal_segment_mutex); + Assert(nextSegNoToRead); + private_data->xlogsegno = nextSegNoToRead; + nextSegNoToRead++; + pthread_mutex_unlock(&wal_segment_mutex); + + /* We've reached the end */ + if (private_data->xlogsegno > arg->endSegNo) + return false; + + /* Adjust next record position */ + XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0, arg->startpoint); + /* Skip over the page header and contrecord if any */ + found = XLogFindNextRecord(xlogreader, arg->startpoint); + + /* + * We get invalid WAL record pointer usually when WAL segment is + * absent or is corrupted. + */ + if (XLogRecPtrIsInvalid(found)) + { + elog(WARNING, "could not read WAL record at %X/%X", + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + PrintXLogCorruptionMsg(private_data, ERROR); + } + arg->startpoint = found; + + elog(VERBOSE, "Thread %d switched to LSN %X/%X", + arg->thread_num, + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + + return true; +} + /* * extractPageMap() worker. */ @@ -150,7 +199,7 @@ doExtractPageMap(void *arg) private_data = &extract_arg->private_data; xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data); if (xlogreader == NULL) - elog(ERROR, "out of memory"); + elog(ERROR, "Thread [%d]: out of memory", private_data->thread_num); xlogreader->system_identifier = system_identifier; found = XLogFindNextRecord(xlogreader, extract_arg->startpoint); @@ -161,15 +210,16 @@ doExtractPageMap(void *arg) */ if (XLogRecPtrIsInvalid(found)) { - elog(WARNING, "could not read WAL record at %X/%X", + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, (uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint)); PrintXLogCorruptionMsg(private_data, ERROR); } extract_arg->startpoint = found; - elog(VERBOSE, "Start LSN of thread %d: %X/%X", - extract_arg->thread_num, + elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X", + private_data->thread_num, (uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint)); @@ -181,7 +231,18 @@ doExtractPageMap(void *arg) XLogRecord *record; if (interrupted) - elog(ERROR, "Interrupted during WAL reading"); + elog(ERROR, "Thread [%d]: Interrupted during WAL reading", + private_data->thread_num); + + /* + * We need to switch to the next WAL segment after reading previous + * record. It may happen if we read contrecord. + */ + if (private_data->need_switch) + { + if (!switchToNextWal(xlogreader, extract_arg)) + break; + } record = XLogReadRecord(xlogreader, extract_arg->startpoint, &errormsg); @@ -190,23 +251,15 @@ doExtractPageMap(void *arg) XLogRecPtr errptr; /* - * Try to switch to the next WAL segment. Usually - * SimpleXLogPageRead() does it by itself. But here we need to do it - * manually to support threads. + * There is no record, try to switch to the next WAL segment. + * Usually SimpleXLogPageRead() does it by itself. But here we need + * to do it manually to support threads. */ - if (private_data->need_switch) + if (private_data->need_switch && errormsg == NULL) { - private_data->need_switch = false; - - /* Critical section */ - pthread_lock(&wal_segment_mutex); - Assert(nextSegNoToRead); - private_data->xlogsegno = nextSegNoToRead; - nextSegNoToRead++; - pthread_mutex_unlock(&wal_segment_mutex); - - /* We reach the end */ - if (private_data->xlogsegno > extract_arg->endSegNo) + if (switchToNextWal(xlogreader, extract_arg)) + continue; + else break; /* Adjust next record position */ @@ -220,15 +273,16 @@ doExtractPageMap(void *arg) */ if (XLogRecPtrIsInvalid(found)) { - elog(WARNING, "could not read WAL record at %X/%X", + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, (uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint)); PrintXLogCorruptionMsg(private_data, ERROR); } extract_arg->startpoint = found; - elog(VERBOSE, "Thread %d switched to LSN %X/%X", - extract_arg->thread_num, + elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X", + private_data->thread_num, (uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint)); @@ -239,11 +293,13 @@ doExtractPageMap(void *arg) extract_arg->startpoint : xlogreader->EndRecPtr; if (errormsg) - elog(WARNING, "could not read WAL record at %X/%X: %s", + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", + private_data->thread_num, (uint32) (errptr >> 32), (uint32) (errptr), errormsg); else - elog(WARNING, "could not read WAL record at %X/%X", + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, (uint32) (errptr >> 32), (uint32) (errptr)); /* @@ -317,7 +373,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, for (i = 0; i < num_threads; i++) { InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, false); - thread_args[i].thread_num = i; + thread_args[i].private_data.thread_num = i + 1; thread_args[i].startpoint = startpoint; thread_args[i].endpoint = endpoint; @@ -344,7 +400,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, /* Run threads */ for (i = 0; i < threads_need; i++) { - elog(VERBOSE, "Start WAL reader thread: %d", i); + elog(VERBOSE, "Start WAL reader thread: %d", i + 1); pthread_create(&threads[i], NULL, doExtractPageMap, &thread_args[i]); } @@ -736,15 +792,38 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, */ if (!XLByteInSeg(targetPagePtr, private_data->xlogsegno)) { - CleanupXLogPageRead(xlogreader); + elog(VERBOSE, "Need to switch to segno next to %X/%X, current LSN %X/%X", + (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr), + (uint32) (xlogreader->currRecPtr >> 32), + (uint32) (xlogreader->currRecPtr )); + /* - * Do not switch to next WAL segment in this function. Currently it is - * manually switched only in doExtractPageMap(). + * if the last record on the page is not complete, + * we must continue reading pages in the same thread */ - if (private_data->manual_switch) + if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) && + xlogreader->currRecPtr < targetPagePtr) { - private_data->need_switch = true; - return -1; + CleanupXLogPageRead(xlogreader); + + /* + * Switch to the next WAL segment after reading contrecord. + */ + if (private_data->manual_switch) + private_data->need_switch = true; + } + else + { + CleanupXLogPageRead(xlogreader); + /* + * Do not switch to next WAL segment in this function. Currently it is + * manually switched only in doExtractPageMap(). + */ + if (private_data->manual_switch) + { + private_data->need_switch = true; + return -1; + } } } @@ -761,7 +840,9 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (fileExists(private_data->xlogpath)) { - elog(LOG, "Opening WAL segment \"%s\"", private_data->xlogpath); + elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"", + private_data->thread_num, + private_data->xlogpath); private_data->xlogexists = true; private_data->xlogfile = open(private_data->xlogpath, @@ -769,8 +850,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (private_data->xlogfile < 0) { - elog(WARNING, "Could not open WAL segment \"%s\": %s", - private_data->xlogpath, strerror(errno)); + elog(WARNING, "Thread [%d]: Could not open WAL segment \"%s\": %s", + private_data->thread_num, + private_data->xlogpath, + strerror(errno)); return -1; } } @@ -783,16 +866,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, private_data->xlogpath); if (fileExists(private_data->gz_xlogpath)) { - elog(LOG, "Opening compressed WAL segment \"%s\"", - private_data->gz_xlogpath); + elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"", + private_data->thread_num, private_data->gz_xlogpath); private_data->xlogexists = true; private_data->gz_xlogfile = gzopen(private_data->gz_xlogpath, "rb"); if (private_data->gz_xlogfile == NULL) { - elog(WARNING, "Could not open compressed WAL segment \"%s\": %s", - private_data->gz_xlogpath, strerror(errno)); + elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s", + private_data->thread_num, private_data->gz_xlogpath, strerror(errno)); return -1; } } @@ -814,15 +897,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, { if (lseek(private_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0) { - elog(WARNING, "Could not seek in WAL segment \"%s\": %s", - private_data->xlogpath, strerror(errno)); + elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s", + private_data->thread_num, private_data->xlogpath, strerror(errno)); return -1; } if (read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { - elog(WARNING, "Could not read from WAL segment \"%s\": %s", - private_data->xlogpath, strerror(errno)); + elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s", + private_data->thread_num, private_data->xlogpath, strerror(errno)); return -1; } } @@ -831,7 +914,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, { if (gzseek(private_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1) { - elog(WARNING, "Could not seek in compressed WAL segment \"%s\": %s", + elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s", + private_data->thread_num, private_data->gz_xlogpath, get_gz_error(private_data->gz_xlogfile)); return -1; @@ -839,7 +923,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (gzread(private_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { - elog(WARNING, "Could not read from compressed WAL segment \"%s\": %s", + elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s", + private_data->thread_num, private_data->gz_xlogpath, get_gz_error(private_data->gz_xlogfile)); return -1; @@ -910,15 +995,19 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel) * We throw a WARNING here to be able to update backup status. */ if (!private_data->xlogexists) - elog(elevel, "WAL segment \"%s\" is absent", private_data->xlogpath); + elog(elevel, "Thread [%d]: WAL segment \"%s\" is absent", + private_data->thread_num, + private_data->xlogpath); else if (private_data->xlogfile != -1) - elog(elevel, "Possible WAL corruption. " + elog(elevel, "Thread [%d]: Possible WAL corruption. " "Error has occured during reading WAL segment \"%s\"", + private_data->thread_num, private_data->xlogpath); #ifdef HAVE_LIBZ else if (private_data->gz_xlogfile != NULL) - elog(elevel, "Possible WAL corruption. " + elog(elevel, "Thread [%d]: Possible WAL corruption. " "Error has occured during reading WAL segment \"%s\"", + private_data->thread_num, private_data->gz_xlogpath); #endif } diff --git a/tests/page.py b/tests/page.py index aa20da66..c4a226d4 100644 --- a/tests/page.py +++ b/tests/page.py @@ -782,8 +782,8 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join( wals_dir, f)) and not f.endswith('.backup')] wals = map(str, wals) - file = os.path.join(wals_dir, max(wals)) -# file = os.path.join(wals_dir, '000000010000000000000004') + # file = os.path.join(wals_dir, max(wals)) + file = os.path.join(wals_dir, '000000010000000000000004') print(file) with open(file, "rb+", 0) as f: f.seek(42) @@ -809,7 +809,8 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): 'INFO: Wait for LSN' in e.message and 'in archived WAL segment' in e.message and 'WARNING: could not read WAL record at' in e.message and - 'ERROR: WAL segment "{0}" is absent\n'.format( + 'incorrect resource manager data checksum in record at' in e.message and + 'ERROR: Possible WAL corruption. Error has occured during reading WAL segment "{0}"'.format( file) in e.message, '\n Unexpected Error Message: {0}\n CMD: {1}'.format( repr(e.message), self.cmd)) @@ -834,7 +835,8 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): 'INFO: Wait for LSN' in e.message and 'in archived WAL segment' in e.message and 'WARNING: could not read WAL record at' in e.message and - 'ERROR: WAL segment "{0}" is absent\n'.format( + 'incorrect resource manager data checksum in record at' in e.message and + 'ERROR: Possible WAL corruption. Error has occured during reading WAL segment "{0}"'.format( file) in e.message, '\n Unexpected Error Message: {0}\n CMD: {1}'.format( repr(e.message), self.cmd))