You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-15 01:04:37 +02:00
Use a single file to handle global errors in async archiving.
The prior behavior on a global error (i.e. not file specific) was to write an individual error file for each WAL file being processed. On retry each of these error files would be removed, and if the error was persistent, they would then be recreated. In a busy environment this could mean tens or hundreds of thousands of files. Another issue was that the error files could not be written until a list of WAL files to process had been generated. This was easy enough for archive-get but archive-push requires more processing and any errors that happened when generating the list would only be reported in the pgBackRest log rather than the PostgreSQL log. Instead write a global.error file that applies to any WAL file that does not have an explicit ok or error file. This reduces churn and allows more errors to be reported directly to PostgreSQL.
This commit is contained in:
@ -49,6 +49,10 @@
|
||||
<p>Add <id>storageRepoWrite()</id> to storage helper.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<p>Use a single file to handle global errors in async archiving.</p>
|
||||
</release-item>
|
||||
|
||||
<release-item>
|
||||
<p>Add document creation to XML objects.</p>
|
||||
</release-item>
|
||||
|
@ -24,6 +24,12 @@ STRING_EXTERN(WAL_SEGMENT_PARTIAL_REGEXP_STR, WAL_SEGMENT_
|
||||
STRING_EXTERN(WAL_SEGMENT_DIR_REGEXP_STR, WAL_SEGMENT_DIR_REGEXP);
|
||||
STRING_EXTERN(WAL_SEGMENT_FILE_REGEXP_STR, WAL_SEGMENT_FILE_REGEXP);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Global error file constant
|
||||
***********************************************************************************************************************************/
|
||||
#define STATUS_FILE_GLOBAL "global"
|
||||
STRING_STATIC(STATUS_FILE_GLOBAL_STR, STATUS_FILE_GLOBAL);
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Get the correct spool queue based on the archive mode
|
||||
***********************************************************************************************************************************/
|
||||
@ -53,27 +59,27 @@ archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confe
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
String *errorFile = NULL;
|
||||
bool errorFileExists = false;
|
||||
|
||||
const String *spoolQueue = archiveAsyncSpoolQueue(archiveMode);
|
||||
|
||||
String *okFile = strNewFmt("%s" STATUS_EXT_OK, strPtr(walSegment));
|
||||
String *errorFile = strNewFmt("%s" STATUS_EXT_ERROR, strPtr(walSegment));
|
||||
|
||||
bool okFileExists = storageExistsNP(storageSpool(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(okFile)));
|
||||
bool errorFileExists = storageExistsNP(storageSpool(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(errorFile)));
|
||||
|
||||
// If both status files are found then warn, remove the files, and return false so the segment will be retried. This may be
|
||||
// a bug in the async process but it may also be a failed fsync or other filesystem issue. In any case, a hard failure here
|
||||
// would mean that archiving is completely stuck so it is better to attempt a retry.
|
||||
if (okFileExists && errorFileExists)
|
||||
// If the ok file does not exist then check to see if a file-specific or global error exists
|
||||
if (!okFileExists)
|
||||
{
|
||||
LOG_WARN(
|
||||
"multiple status files found in '%s' for WAL segment '%s' will be removed and the command retried",
|
||||
strPtr(storagePath(storageSpool(), spoolQueue)), strPtr(walSegment));
|
||||
// Check for a file-specific error first
|
||||
errorFile = strNewFmt("%s" STATUS_EXT_ERROR, strPtr(walSegment));
|
||||
errorFileExists = storageExistsNP(storageSpool(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(errorFile)));
|
||||
|
||||
storageRemoveNP(storageSpoolWrite(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(okFile)));
|
||||
okFileExists = false;
|
||||
|
||||
storageRemoveNP(storageSpoolWrite(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(errorFile)));
|
||||
errorFileExists = false;
|
||||
// If that doesn't exist then check for a global error
|
||||
if (!errorFileExists)
|
||||
{
|
||||
errorFile = strNew(STATUS_FILE_GLOBAL STATUS_EXT_ERROR);
|
||||
errorFileExists = storageExistsNP(storageSpool(), strNewFmt("%s/%s", strPtr(spoolQueue), strPtr(errorFile)));
|
||||
}
|
||||
}
|
||||
|
||||
// If either of them exists then check what happened and report back
|
||||
@ -146,37 +152,28 @@ archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confe
|
||||
Write an error status file
|
||||
***********************************************************************************************************************************/
|
||||
void
|
||||
archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message, bool skipIfOk)
|
||||
archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(ENUM, archiveMode);
|
||||
FUNCTION_LOG_PARAM(STRING, walSegment);
|
||||
FUNCTION_LOG_PARAM(INT, code);
|
||||
FUNCTION_LOG_PARAM(STRING, message);
|
||||
FUNCTION_LOG_PARAM(BOOL, skipIfOk);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(walSegment != NULL);
|
||||
ASSERT(code != 0);
|
||||
ASSERT(message != NULL);
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Only write the file if we are not worried about ok files or if the ok files does not exist
|
||||
if (!skipIfOk ||
|
||||
!(storageExistsNP(
|
||||
storageSpool(),
|
||||
strNewFmt("%s/%s" STATUS_EXT_OK, strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))) ||
|
||||
(archiveMode == archiveModeGet && storageExistsNP(
|
||||
storageSpool(), strNewFmt("%s/%s", strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))))))
|
||||
{
|
||||
const String *errorFile = walSegment == NULL ? STATUS_FILE_GLOBAL_STR : walSegment;
|
||||
|
||||
storagePutNP(
|
||||
storageNewWriteNP(
|
||||
storageSpoolWrite(),
|
||||
strNewFmt("%s/%s" STATUS_EXT_ERROR, strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(walSegment))),
|
||||
strNewFmt("%s/%s" STATUS_EXT_ERROR, strPtr(archiveAsyncSpoolQueue(archiveMode)), strPtr(errorFile))),
|
||||
bufNewStr(strNewFmt("%d\n%s", code, strPtr(message))));
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
|
@ -60,8 +60,7 @@ Functions
|
||||
***********************************************************************************************************************************/
|
||||
bool archiveAsyncStatus(ArchiveMode archiveMode, const String *walSegment, bool confessOnError);
|
||||
void archiveAsyncStatusOkWrite(ArchiveMode archiveMode, const String *walSegment);
|
||||
void archiveAsyncStatusErrorWrite(
|
||||
ArchiveMode archiveMode, const String *walSegment, int code, const String *message, bool skipIfOk);
|
||||
void archiveAsyncStatusErrorWrite(ArchiveMode archiveMode, const String *walSegment, int code, const String *message);
|
||||
|
||||
bool walIsPartial(const String *walSegment);
|
||||
bool walIsSegment(const String *walSegment);
|
||||
|
@ -285,6 +285,8 @@ cmdArchiveGetAsync(void)
|
||||
FUNCTION_LOG_VOID(logLevelDebug);
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
TRY_BEGIN()
|
||||
{
|
||||
// Check the parameters
|
||||
const StringList *walSegmentList = cfgCommandParam();
|
||||
@ -292,8 +294,6 @@ cmdArchiveGetAsync(void)
|
||||
if (strLstSize(walSegmentList) < 1)
|
||||
THROW(ParamInvalidError, "at least one wal segment is required");
|
||||
|
||||
TRY_BEGIN()
|
||||
{
|
||||
LOG_INFO(
|
||||
"get %u WAL file(s) from archive: %s%s", strLstSize(walSegmentList), strPtr(strLstGet(walSegmentList, 0)),
|
||||
strLstSize(walSegmentList) == 1 ?
|
||||
@ -351,22 +351,16 @@ cmdArchiveGetAsync(void)
|
||||
protocolParallelJobErrorCode(job), strPtr(protocolParallelJobErrorMessage(job)));
|
||||
|
||||
archiveAsyncStatusErrorWrite(
|
||||
archiveModeGet, walSegment, protocolParallelJobErrorCode(job), protocolParallelJobErrorMessage(job),
|
||||
false);
|
||||
archiveModeGet, walSegment, protocolParallelJobErrorCode(job), protocolParallelJobErrorMessage(job));
|
||||
}
|
||||
}
|
||||
}
|
||||
while (!protocolParallelDone(parallelExec));
|
||||
}
|
||||
// On any global error write a single error file to cover all unprocessed files
|
||||
CATCH_ANY()
|
||||
{
|
||||
// On any global error write the same error into every .error file unless the get was already successful
|
||||
for (unsigned int walSegmentIdx = 0; walSegmentIdx < strLstSize(walSegmentList); walSegmentIdx++)
|
||||
{
|
||||
archiveAsyncStatusErrorWrite(
|
||||
archiveModeGet, strLstGet(walSegmentList, walSegmentIdx), errorCode(), strNew(errorMessage()), true);
|
||||
}
|
||||
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, NULL, errorCode(), strNew(errorMessage()));
|
||||
RETHROW();
|
||||
}
|
||||
TRY_END();
|
||||
|
@ -80,25 +80,12 @@ testRun(void)
|
||||
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), true, "error status renamed to ok");
|
||||
harnessLogResult(
|
||||
"P00 WARN: WAL segment '000000010000000100000001' was not pushed due to error [25] and was manually skipped: error");
|
||||
TEST_RESULT_VOID(
|
||||
storageRemoveP(
|
||||
storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), .errorOnMissing = true),
|
||||
"remove ok");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutNP(
|
||||
storageNewWriteNP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))), bufNew(0));
|
||||
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "multiple status files returns false");
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
storageExistsNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))), false,
|
||||
".error file was deleted");
|
||||
TEST_RESULT_BOOL(
|
||||
storageExistsNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))), false,
|
||||
".ok file was deleted");
|
||||
|
||||
harnessLogResult(
|
||||
strPtr(
|
||||
strNewFmt(
|
||||
"P00 WARN: multiple status files found in '%s/archive/db/out' for WAL segment '000000010000000100000001'"
|
||||
" will be removed and the command retried", testPath())));
|
||||
|
||||
storagePutNP(
|
||||
storageNewWriteNP(storageSpoolWrite(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))), bufNew(0));
|
||||
TEST_ERROR(
|
||||
@ -111,6 +98,13 @@ testRun(void)
|
||||
TEST_ERROR(archiveAsyncStatus(archiveModePush, segment, true), AssertError, "message");
|
||||
|
||||
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "suppress error");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutNP(
|
||||
storageNewWriteNP(storageSpoolWrite(), strNew(STORAGE_SPOOL_ARCHIVE_OUT "/global.error")),
|
||||
bufNewZ("102\nexecute error"));
|
||||
|
||||
TEST_ERROR(archiveAsyncStatus(archiveModePush, strNew("anyfile"), true), ExecuteError, "execute error");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
@ -126,7 +120,7 @@ testRun(void)
|
||||
String *walSegment = strNew("000000010000000100000001");
|
||||
|
||||
TEST_RESULT_VOID(
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 25, strNew("error message"), false), "write error");
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 25, strNew("error message")), "write error");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/000000010000000100000001.error"))))),
|
||||
"25\nerror message", "check error");
|
||||
@ -135,22 +129,22 @@ testRun(void)
|
||||
"remove error");
|
||||
|
||||
TEST_RESULT_VOID(
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 66, strNew("multi-line\nerror message"), true),
|
||||
"write error skip if ok (ok missing)");
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, NULL, 25, strNew("global error message")), "write global error");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/000000010000000100000001.error"))))),
|
||||
"66\nmulti-line\nerror message", "check error");
|
||||
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/global.error"))))),
|
||||
"25\nglobal error message", "check global error");
|
||||
TEST_RESULT_VOID(
|
||||
storageRemoveP(storageTest, strNew("archive/db/in/000000010000000100000001.error"), .errorOnMissing = true),
|
||||
"remove error");
|
||||
storageRemoveP(storageTest, strNew("archive/db/in/global.error"), .errorOnMissing = true),
|
||||
"remove global error");
|
||||
|
||||
TEST_RESULT_VOID(
|
||||
archiveAsyncStatusOkWrite(archiveModeGet, walSegment), "write ok file");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(strNewBuf(storageGetNP(storageNewReadNP(storageTest, strNew("archive/db/in/000000010000000100000001.ok"))))),
|
||||
"", "check ok");
|
||||
TEST_RESULT_VOID(
|
||||
archiveAsyncStatusErrorWrite(archiveModeGet, walSegment, 101, strNew("more error message"), true),
|
||||
"write error skip if ok (ok present)");
|
||||
TEST_RESULT_BOOL(
|
||||
storageExistsNP(storageTest, strNew("archive/db/in/000000010000000100000001.error")), false, "error does not exist");
|
||||
storageRemoveP(storageTest, strNew("archive/db/in/000000010000000100000001.ok"), .errorOnMissing = true),
|
||||
"remove ok");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
|
@ -500,16 +500,16 @@ testRun(void)
|
||||
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000002.error")), false,
|
||||
"check 000000010000000100000002.error not in spool");
|
||||
TEST_RESULT_BOOL(
|
||||
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error")), true,
|
||||
"check 000000010000000100000003.error in spool");
|
||||
storageExistsNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error")), false,
|
||||
"check 000000010000000100000003.error not in spool");
|
||||
TEST_RESULT_STR(
|
||||
strPtr(
|
||||
strNewBuf(
|
||||
storageGetNP(
|
||||
storageNewReadNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/000000010000000100000003.error"))))),
|
||||
storageNewReadNP(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_IN "/global.error"))))),
|
||||
"102\nlocal-1 process terminated unexpectedly [102]: unable to execute 'pgbackrest-bogus': "
|
||||
"[2] No such file or directory",
|
||||
"check error");
|
||||
"check global error");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
|
Reference in New Issue
Block a user