1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-03 14:52:21 +02:00

Convert page checksum filter result to a pack.

The pack is both more compact and more efficient than a variant.

Also aggregate the page error info in the main process rather than in the filter to allow additional LSN filtering, to be added in a future commit.
This commit is contained in:
David Steele 2021-09-24 17:40:31 -04:00
parent ac1f6db4a2
commit c8ea17c68f
12 changed files with 186 additions and 108 deletions

View File

@ -1013,6 +1013,101 @@ backupStop(BackupData *backupData, Manifest *manifest)
FUNCTION_LOG_RETURN_STRUCT(result);
}
/***********************************************************************************************************************************
Convert page checksum error pack to a VariantList
***********************************************************************************************************************************/
// Helper to output pages and page ranges
static void
backupJobResultPageChecksumOut(VariantList *const result, const unsigned int pageBegin, const unsigned int pageEnd)
{
FUNCTION_TEST_BEGIN();
FUNCTION_TEST_PARAM(VARIANT_LIST, result);
FUNCTION_TEST_PARAM(UINT, pageBegin);
FUNCTION_TEST_PARAM(UINT, pageEnd);
FUNCTION_TEST_END();
// Output a single page
if (pageBegin == pageEnd)
{
varLstAdd(result, varNewUInt64(pageBegin));
}
// Else output a page range
else
{
VariantList *errorListSub = varLstNew();
varLstAdd(errorListSub, varNewUInt64(pageBegin));
varLstAdd(errorListSub, varNewUInt64(pageEnd));
varLstAdd(result, varNewVarLst(errorListSub));
}
FUNCTION_TEST_RETURN_VOID();
}
static VariantList *
backupJobResultPageChecksum(PackRead *const checksumPageResult)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(PACK_READ, checksumPageResult);
FUNCTION_LOG_END();
VariantList *result = NULL;
// If there is an error result array
if (!pckReadNullP(checksumPageResult))
{
result = varLstNew();
pckReadArrayBeginP(checksumPageResult);
bool first = false;
unsigned int pageBegin = 0;
unsigned int pageEnd = 0;
// Combine results into a more compact form
while (pckReadNext(checksumPageResult))
{
unsigned int pageId = pckReadId(checksumPageResult) - 1;
pckReadObjBeginP(checksumPageResult, .id = pageId + 1);
// ??? Discarded for now but will eventually be used for filtering
pckReadU64P(checksumPageResult);
// If first error then just store page
if (!first)
{
pageBegin = pageId;
pageEnd = pageId;
first = true;
}
// Expand list when the page is in sequence
else if (pageId == pageEnd + 1)
{
pageEnd = pageId;
}
// Else output the page or page range
else
{
backupJobResultPageChecksumOut(result, pageBegin, pageEnd);
// Start again with a single page range
pageBegin = pageId;
pageEnd = pageId;
}
pckReadObjEndP(checksumPageResult);
}
// Check that the array was not empty
CHECK(first);
// Output last page or page range
backupJobResultPageChecksumOut(result, pageBegin, pageEnd);
pckReadArrayEndP(checksumPageResult);
}
FUNCTION_LOG_RETURN(VARIANT_LIST, result);
}
/***********************************************************************************************************************************
Log the results of a job and throw errors
***********************************************************************************************************************************/
@ -1049,7 +1144,7 @@ backupJobResult(
const uint64_t copySize = pckReadU64P(jobResult);
const uint64_t repoSize = pckReadU64P(jobResult);
const String *const copyChecksum = pckReadStrP(jobResult);
const KeyValue *const checksumPageResult = varKv(jsonToVar(pckReadStrP(jobResult, .defaultValue = NULL_STR)));
PackRead *const checksumPageResult = pckReadPackReadP(jobResult);
// Increment backup copy progress
sizeCopied += copySize;
@ -1109,12 +1204,14 @@ backupJobResult(
if (checksumPageResult != NULL)
{
checksumPageErrorList = backupJobResultPageChecksum(checksumPageResult);
// If the checksum was valid
if (!varBool(kvGet(checksumPageResult, VARSTRDEF("valid"))))
if (!pckReadBoolP(checksumPageResult))
{
checksumPageError = true;
if (!varBool(kvGet(checksumPageResult, VARSTRDEF("align"))))
if (!pckReadBoolP(checksumPageResult))
{
checksumPageErrorList = NULL;
@ -1126,8 +1223,8 @@ backupJobResult(
else
{
// Format the page checksum errors
checksumPageErrorList = varVarLst(kvGet(checksumPageResult, VARSTRDEF("error")));
ASSERT(!varLstEmpty(checksumPageErrorList));
CHECK(checksumPageErrorList != NULL);
CHECK(!varLstEmpty(checksumPageErrorList));
String *error = strNew();
unsigned int errorTotalMin = 0;

View File

@ -251,9 +251,8 @@ backupFile(
// Get results of page checksum validation
if (pgFileChecksumPage)
{
result.pageChecksumResult = jsonToKv(
pckReadStrP(
ioFilterGroupResultP(ioReadFilterGroup(storageReadIo(read)), PAGE_CHECKSUM_FILTER_TYPE)));
result.pageChecksumResult = pckDup(
ioFilterGroupResultPackP(ioReadFilterGroup(storageReadIo(read)), PAGE_CHECKSUM_FILTER_TYPE));
}
}
MEM_CONTEXT_PRIOR_END();

View File

@ -30,7 +30,7 @@ typedef struct BackupFileResult
uint64_t copySize;
String *copyChecksum;
uint64_t repoSize;
KeyValue *pageChecksumResult;
Pack *pageChecksumResult;
} BackupFileResult;
BackupFileResult backupFile(

View File

@ -27,10 +27,7 @@ typedef struct PageChecksum
bool valid; // Is the relation structure valid?
bool align; // Is the relation alignment valid?
VariantList *error; // List of checksum errors
unsigned int errorMin; // Current min error page
unsigned int errorMax; // Current max error page
PackWrite *error; // List of checksum errors
} PageChecksum;
/***********************************************************************************************************************************
@ -121,20 +118,21 @@ pageChecksumProcess(THIS_VOID, const Buffer *input)
continue;
}
// Add the page error
MEM_CONTEXT_BEGIN(this->memContext)
// Create the error list if it does not exist yet
if (this->error == NULL)
{
// Create the error list if it does not exist yet
if (this->error == NULL)
this->error = varLstNew();
// Add page number and lsn to the error list
VariantList *pair = varLstNew();
varLstAdd(pair, varNewUInt(blockNo));
varLstAdd(pair, varNewUInt64(pageLsn));
varLstAdd(this->error, varNewVarLst(pair));
MEM_CONTEXT_BEGIN(this->memContext)
{
this->error = pckWriteNewP();
pckWriteArrayBeginP(this->error);
}
MEM_CONTEXT_END();
}
MEM_CONTEXT_END();
// Add page number and lsn to the error list
pckWriteObjBeginP(this->error, .id = blockNo + 1);
pckWriteU64P(this->error, pageLsn);
pckWriteObjEndP(this->error);
}
this->pageNoOffset += pageTotal;
@ -159,76 +157,31 @@ pageChecksumResult(THIS_VOID)
Pack *result = NULL;
MEM_CONTEXT_TEMP_BEGIN()
MEM_CONTEXT_BEGIN(this->memContext)
{
KeyValue *error = kvNew();
// End the error array
if (this->error != NULL)
{
VariantList *errorList = varLstNew();
unsigned int errorIdx = 0;
// Convert the full list to an abbreviated list. In the future we want to return the entire list so pages can be verified
// in the WAL.
do
{
unsigned int pageId = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx)), 0));
if (errorIdx == varLstSize(this->error) - 1)
{
varLstAdd(errorList, varNewUInt(pageId));
errorIdx++;
}
else
{
unsigned int pageIdNext = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx + 1)), 0));
if (pageIdNext > pageId + 1)
{
varLstAdd(errorList, varNewUInt(pageId));
errorIdx++;
}
else
{
unsigned int pageIdLast = pageIdNext;
errorIdx++;
while (errorIdx < varLstSize(this->error) - 1)
{
pageIdNext = varUInt(varLstGet(varVarLst(varLstGet(this->error, errorIdx + 1)), 0));
if (pageIdNext > pageIdLast + 1)
break;
pageIdLast = pageIdNext;
errorIdx++;
}
VariantList *errorListSub = varLstNew();
varLstAdd(errorListSub, varNewUInt(pageId));
varLstAdd(errorListSub, varNewUInt(pageIdLast));
varLstAdd(errorList, varNewVarLst(errorListSub));
errorIdx++;
}
}
}
while (errorIdx < varLstSize(this->error));
pckWriteArrayEndP(this->error);
this->valid = false;
kvPut(error, varNewStrZ("error"), varNewVarLst(errorList));
}
// Else create a pack to hold the flags
else
{
this->error = pckWriteNewP();
pckWriteNullP(this->error);
}
kvPut(error, VARSTRDEF("valid"), VARBOOL(this->valid));
kvPut(error, VARSTRDEF("align"), VARBOOL(this->align));
// Valid and align flags
pckWriteBoolP(this->error, this->valid, .defaultWrite = true);
pckWriteBoolP(this->error, this->align, .defaultWrite = true);
PackWrite *const packWrite = pckWriteNewP();
// End pack
pckWriteEndP(this->error);
pckWriteStrP(packWrite, jsonFromKv(error));
pckWriteEndP(packWrite);
result = pckMove(pckWriteResult(packWrite), memContextPrior());
result = pckMove(pckWriteResult(this->error), memContextPrior());
}
MEM_CONTEXT_TEMP_END();
MEM_CONTEXT_END();
FUNCTION_LOG_RETURN(PACK, result);
}

View File

@ -55,7 +55,7 @@ backupFileProtocol(PackRead *const param, ProtocolServer *const server)
pckWriteU64P(resultPack, result.copySize);
pckWriteU64P(resultPack, result.repoSize);
pckWriteStrP(resultPack, result.copyChecksum);
pckWriteStrP(resultPack, result.pageChecksumResult != NULL ? jsonFromKv(result.pageChecksumResult) : NULL);
pckWritePackP(resultPack, result.pageChecksumResult);
protocolServerDataPut(server, resultPack);
protocolServerDataEndPut(server);

View File

@ -420,8 +420,8 @@ ioFilterGroupParamAll(const IoFilterGroup *this)
}
/**********************************************************************************************************************************/
PackRead *
ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
const Pack *
ioFilterGroupResultPack(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
@ -432,7 +432,7 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
ASSERT(this->pub.opened);
ASSERT(filterType != 0);
PackRead *result = NULL;
const Pack *result = NULL;
// Search for the result
unsigned int foundIdx = 0;
@ -447,7 +447,7 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
// If the index matches return the result
if (foundIdx == param.idx)
{
result = pckReadNew(filterResult->result);
result = filterResult->result;
break;
}
@ -456,7 +456,20 @@ ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType,
}
}
FUNCTION_LOG_RETURN(PACK_READ, result);
FUNCTION_LOG_RETURN_CONST(PACK, result);
}
/**********************************************************************************************************************************/
PackRead *
ioFilterGroupResult(const IoFilterGroup *const this, const StringId filterType, const IoFilterGroupResultParam param)
{
FUNCTION_LOG_BEGIN(logLevelDebug);
FUNCTION_LOG_PARAM(IO_FILTER_GROUP, this);
FUNCTION_LOG_PARAM(STRING_ID, filterType);
FUNCTION_LOG_PARAM(UINT, param.idx);
FUNCTION_LOG_END();
FUNCTION_LOG_RETURN(PACK_READ, pckReadNew(ioFilterGroupResultPack(this, filterType, param)));
}
/**********************************************************************************************************************************/

View File

@ -73,6 +73,11 @@ typedef struct IoFilterGroupResultParam
PackRead *ioFilterGroupResult(const IoFilterGroup *this, StringId filterType, IoFilterGroupResultParam param);
#define ioFilterGroupResultPackP(this, filterType, ...) \
ioFilterGroupResultPack(this, filterType, (IoFilterGroupResultParam){VAR_PARAM_INIT, __VA_ARGS__})
const Pack *ioFilterGroupResultPack(const IoFilterGroup *this, StringId filterType, IoFilterGroupResultParam param);
// Get/set all filter results
Pack *ioFilterGroupResultAll(const IoFilterGroup *this);
void ioFilterGroupResultAllSet(IoFilterGroup *this, const Pack *filterResult);

View File

@ -134,6 +134,13 @@ typedef enum
/***********************************************************************************************************************************
Pack Functions
***********************************************************************************************************************************/
// Duplicate pack
__attribute__((always_inline)) static inline Pack *
pckDup(const Pack *const this)
{
return (Pack *)bufDup((const Buffer *)this);
}
// Cast Buffer to Pack
__attribute__((always_inline)) static inline const Pack *
pckFromBuf(const Buffer *const buffer)

View File

@ -8,6 +8,8 @@ Test Common Functions and Definitions for Backup and Expire Commands
#include "postgres/interface/static.vendor.h"
#include "storage/posix/storage.h"
#include "common/harnessPack.h"
/***********************************************************************************************************************************
Test Run
***********************************************************************************************************************************/
@ -140,8 +142,8 @@ testRun(void)
ioWriteClose(write);
TEST_RESULT_STR_Z(
pckReadStrP(ioFilterGroupResultP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"{\"align\":true,\"valid\":true}", "all zero pages");
hrnPackToStr(ioFilterGroupResultPackP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"2:bool:true, 3:bool:true", "all zero pages");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("single valid page");
@ -174,8 +176,8 @@ testRun(void)
ioWriteClose(write);
TEST_RESULT_STR_Z(
pckReadStrP(ioFilterGroupResultP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"{\"align\":true,\"valid\":true}", "single valid page");
hrnPackToStr(ioFilterGroupResultPackP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"2:bool:true, 3:bool:true", "single valid page");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("single checksum error");
@ -205,8 +207,8 @@ testRun(void)
ioWriteClose(write);
TEST_RESULT_STR_Z(
pckReadStrP(ioFilterGroupResultP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"{\"align\":true,\"error\":[0],\"valid\":false}", "single checksum error");
hrnPackToStr(ioFilterGroupResultPackP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"1:array:[1:obj:{1:u64:17361641481138401520}], 2:bool:false, 3:bool:true", "single checksum error");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("various checksum errors some of which will be skipped because of the LSN");
@ -297,8 +299,10 @@ testRun(void)
ioWriteClose(write);
TEST_RESULT_STR_Z(
pckReadStrP(ioFilterGroupResultP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"{\"align\":false,\"error\":[0,[2,4],[6,7]],\"valid\":false}", "various checksum errors");
hrnPackToStr(ioFilterGroupResultPackP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"1:array:[1:obj:{1:u64:17361641481138401520}, 3:obj:{1:u64:2}, 4:obj:{1:u64:3}, 5:obj:{1:u64:4}, 7:obj:{1:u64:6},"
" 8:obj:{1:u64:7}], 2:bool:false, 3:bool:false",
"various checksum errors");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("impossibly misaligned page");
@ -316,8 +320,8 @@ testRun(void)
ioWriteClose(write);
TEST_RESULT_STR_Z(
pckReadStrP(ioFilterGroupResultP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"{\"align\":false,\"valid\":false}", "misalignment");
hrnPackToStr(ioFilterGroupResultPackP(ioWriteFilterGroup(write), PAGE_CHECKSUM_FILTER_TYPE)),
"2:bool:false, 3:bool:false", "misalignment");
// -------------------------------------------------------------------------------------------------------------------------
TEST_TITLE("two misaligned buffers in a row");

View File

@ -12,6 +12,7 @@ Test Backup Command
#include "common/harnessConfig.h"
#include "common/harnessPostgres.h"
#include "common/harnessPq.h"
#include "common/harnessPack.h"
#include "common/harnessProtocol.h"
#include "common/harnessStorage.h"
@ -545,8 +546,7 @@ testRun(void)
TEST_RESULT_UINT(result.repoSize, 9, "repo=pgFile size");
TEST_RESULT_UINT(result.backupCopyResult, backupCopyResultCopy, "copy file");
TEST_RESULT_STR_Z(result.copyChecksum, "9bc8ab2dda60ef4beed07d1e19ce0676d5edde67", "copy checksum matches");
TEST_RESULT_PTR_NE(result.pageChecksumResult, NULL, "pageChecksumResult is set");
TEST_RESULT_BOOL(varBool(kvGet(result.pageChecksumResult, VARSTRDEF("valid"))), false, "pageChecksumResult valid=false");
TEST_RESULT_STR_Z(hrnPackToStr(result.pageChecksumResult), "2:bool:false, 3:bool:false", "pageChecksumResult");
TEST_STORAGE_EXISTS(storageRepoWrite(), strZ(backupPathFile), .remove = true, .comment = "check exists in repo, remove");
// -------------------------------------------------------------------------------------------------------------------------
@ -579,7 +579,7 @@ testRun(void)
TEST_RESULT_UINT(result.repoSize, 12, "repo size");
TEST_RESULT_UINT(result.backupCopyResult, backupCopyResultCopy, "copy file");
TEST_RESULT_STR_Z(result.copyChecksum, "c3ae4687ea8ccd47bfdb190dbe7fd3b37545fdb9", "checksum");
TEST_RESULT_STR_Z(jsonFromKv(result.pageChecksumResult), "{\"align\":false,\"valid\":false}", "page checksum");
TEST_RESULT_STR_Z(hrnPackToStr(result.pageChecksumResult), "2:bool:false, 3:bool:false", "page checksum");
TEST_STORAGE_GET(storageRepo(), strZ(backupPathFile), "atestfile###");
// -------------------------------------------------------------------------------------------------------------------------

View File

@ -344,7 +344,7 @@ testRun(void)
TEST_RESULT_VOID(pckWritePtrP(packWrite, "sample"), "write pointer");
TEST_RESULT_VOID(pckWriteEndP(packWrite), "write end");
TEST_ASSIGN(packRead, pckReadNew(pckWriteResult(packWrite)), "new read");
TEST_ASSIGN(packRead, pckReadNew(pckDup(pckWriteResult(packWrite))), "new read");
TEST_RESULT_Z(pckReadPtrP(packRead), NULL, "read default pointer");
TEST_RESULT_Z(pckReadPtrP(packRead, .id = 2), "sample", "read pointer");

View File

@ -307,8 +307,8 @@ testRun(void)
TEST_RESULT_STR_Z(
hrnPackToStr(ioFilterGroupResultAll(filterGroup)),
"1:strid:size, 2:pack:<1:u64:8>, 3:strid:hash, 4:pack:<1:str:bbbcf2c59433f68f22376cd2439d6cd309378df6>,"
" 5:strid:pg-chksum, 6:pack:<1:str:{\"align\":false,\"valid\":false}>, 7:strid:cipher-blk, 9:strid:cipher-blk,"
" 11:strid:gz-cmp, 13:strid:gz-dcmp, 15:strid:buffer",
" 5:strid:pg-chksum, 6:pack:<2:bool:false, 3:bool:false>, 7:strid:cipher-blk, 9:strid:cipher-blk, 11:strid:gz-cmp,"
" 13:strid:gz-dcmp, 15:strid:buffer",
"filter results");
// Check protocol function directly (file exists but all data goes to sink)