1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-07-13 01:00:23 +02:00

Add asynchronous, parallel archive-get.

This feature maintains a queue of WAL segments to help reduce latency when PostgreSQL requests a WAL segment with restore_command.
This commit is contained in:
David Steele
2018-04-30 17:27:39 -04:00
parent c48b0a2a1e
commit 54dd6f3ed4
45 changed files with 1354 additions and 216 deletions

View File

@ -26,42 +26,48 @@ testRun()
// -------------------------------------------------------------------------------------------------------------------------
String *segment = strNew("000000010000000100000001");
TEST_RESULT_BOOL(archiveAsyncStatus(segment, false), false, "directory and status file not present");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "directory and status file not present");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModeGet, segment, false), false, "directory and status file not present");
// -------------------------------------------------------------------------------------------------------------------------
mkdir(strPtr(strNewFmt("%s/archive", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db", testPath())), 0750);
mkdir(strPtr(strNewFmt("%s/archive/db/out", testPath())), 0750);
TEST_RESULT_BOOL(archiveAsyncStatus(segment, false), false, "status file not present");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "status file not present");
// -------------------------------------------------------------------------------------------------------------------------
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))),
bufNewStr(strNew(BOGUS_STR)));
TEST_ERROR(
archiveAsyncStatus(segment, false), FormatError, "000000010000000100000001.ok content must have at least two lines");
archiveAsyncStatus(archiveModePush, segment, false), FormatError,
"000000010000000100000001.ok content must have at least two lines");
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))),
bufNewStr(strNew(BOGUS_STR "\n")));
TEST_ERROR(archiveAsyncStatus(segment, false), FormatError, "000000010000000100000001.ok message must be > 0");
TEST_ERROR(
archiveAsyncStatus(archiveModePush, segment, false), FormatError, "000000010000000100000001.ok message must be > 0");
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))),
bufNewStr(strNew(BOGUS_STR "\nmessage")));
TEST_ERROR(archiveAsyncStatus(segment, false), FormatError, "unable to convert str 'BOGUS' to int");
TEST_ERROR(archiveAsyncStatus(archiveModePush, segment, false), FormatError, "unable to convert str 'BOGUS' to int");
storagePutNP(storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))), NULL);
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), true, "ok file");
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))),
bufNewStr(strNew("0\nwarning")));
TEST_RESULT_BOOL(archiveAsyncStatus(segment, false), true, "ok file with warning");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), true, "ok file with warning");
testLogResult("P00 WARN: warning");
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))),
bufNewStr(strNew("25\nerror")));
TEST_RESULT_BOOL(archiveAsyncStatus(segment, false), true, "error status renamed to ok");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), true, "error status renamed to ok");
testLogResult(
"P00 WARN: WAL segment '000000010000000100000001' was not pushed due to error [25] and was manually skipped: error");
@ -70,20 +76,22 @@ testRun()
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))),
bufNewStr(strNew("")));
TEST_ERROR(
archiveAsyncStatus(segment, false), AssertError,
archiveAsyncStatus(archiveModePush, segment, false), AssertError,
strPtr(
strNewFmt(
"multiple status files found in '%s/archive/db/out' for WAL segment '000000010000000100000001'", testPath())));
unlink(strPtr(storagePathNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)))));
TEST_ERROR(archiveAsyncStatus(segment, true), AssertError, "status file '000000010000000100000001.error' has no content");
TEST_ERROR(
archiveAsyncStatus(archiveModePush, segment, true), AssertError,
"status file '000000010000000100000001.error' has no content");
storagePutNP(
storageNewWriteNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))),
bufNewStr(strNew("25\nmessage")));
TEST_ERROR(archiveAsyncStatus(segment, true), AssertError, "message");
TEST_ERROR(archiveAsyncStatus(archiveModePush, segment, true), AssertError, "message");
TEST_RESULT_BOOL(archiveAsyncStatus(segment, false), false, "suppress error");
TEST_RESULT_BOOL(archiveAsyncStatus(archiveModePush, segment, false), false, "suppress error");
unlink(strPtr(storagePathNP(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)))));
}