You've already forked pgbackrest
mirror of
https://github.com/pgbackrest/pgbackrest.git
synced 2025-07-11 00:50:20 +02:00
Improve archive-get performance.
Check that archive files exist in the main process instead of the local process. This means that the archive.info file only needs to be loaded once per execution rather than once per file to get. Stop looking when a file is missing or in error. PostgreSQL will never request anything past the missing file so there is no point in getting them. This also reduces "unable to find" logging in the async process. Cache results of storageList() when looking for multiple files to reduce storage I/O. Look for all requested archive files in the archive-id where the first file is found. They may not all be there, but this reduces the number of list calls. If subsequent files are in another archive id they will be found on the next archive-get call.
This commit is contained in:
@ -25,6 +25,16 @@
|
||||
</release-item>
|
||||
</release-bug-list>
|
||||
|
||||
<release-improvement-list>
|
||||
<release-item>
|
||||
<release-item-contributor-list>
|
||||
<release-item-reviewer id="cynthia.shang"/>
|
||||
</release-item-contributor-list>
|
||||
|
||||
<p>Improve <cmd>archive-get</cmd> performance.</p>
|
||||
</release-item>
|
||||
</release-improvement-list>
|
||||
|
||||
<release-development-list>
|
||||
<release-item>
|
||||
<release-item-contributor-list>
|
||||
|
@ -16,98 +16,11 @@ Archive Get File
|
||||
#include "postgres/interface.h"
|
||||
#include "storage/helper.h"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Check if a WAL file exists in the repository
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct ArchiveGetCheckResult
|
||||
{
|
||||
String *archiveFileActual;
|
||||
String *cipherPass;
|
||||
} ArchiveGetCheckResult;
|
||||
|
||||
static ArchiveGetCheckResult
|
||||
archiveGetCheck(const String *archiveFile, CipherType cipherType, const String *cipherPass)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(STRING, archiveFile);
|
||||
FUNCTION_LOG_PARAM(ENUM, cipherType);
|
||||
FUNCTION_TEST_PARAM(STRING, cipherPass);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(archiveFile != NULL);
|
||||
|
||||
ArchiveGetCheckResult result = {0};
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Get pg control info
|
||||
PgControl controlInfo = pgControlFromFile(storagePg());
|
||||
|
||||
// Attempt to load the archive info file
|
||||
InfoArchive *info = infoArchiveLoadFile(storageRepo(), INFO_ARCHIVE_PATH_FILE_STR, cipherType, cipherPass);
|
||||
|
||||
// Loop through the pg history in case the WAL we need is not in the most recent archive id
|
||||
String *archiveId = NULL;
|
||||
const String *archiveFileActual = NULL;
|
||||
|
||||
for (unsigned int pgIdx = 0; pgIdx < infoPgDataTotal(infoArchivePg(info)); pgIdx++)
|
||||
{
|
||||
InfoPgData pgData = infoPgData(infoArchivePg(info), pgIdx);
|
||||
|
||||
// Only use the archive id if it matches the current cluster
|
||||
if (pgData.systemId == controlInfo.systemId && pgData.version == controlInfo.version)
|
||||
{
|
||||
archiveId = infoPgArchiveId(infoArchivePg(info), pgIdx);
|
||||
|
||||
// If a WAL segment search among the possible file names
|
||||
if (walIsSegment(archiveFile))
|
||||
{
|
||||
String *walSegmentFile = walSegmentFind(storageRepo(), archiveId, archiveFile, 0);
|
||||
|
||||
if (walSegmentFile != NULL)
|
||||
{
|
||||
archiveFileActual = strNewFmt("%s/%s", strZ(strSubN(archiveFile, 0, 16)), strZ(walSegmentFile));
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Else if not a WAL segment, see if it exists in the archive dir
|
||||
else if (
|
||||
storageExistsP(storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(archiveFile))))
|
||||
{
|
||||
archiveFileActual = archiveFile;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Error if no archive id was found -- this indicates a mismatch with the current cluster
|
||||
if (archiveId == NULL)
|
||||
{
|
||||
THROW_FMT(
|
||||
ArchiveMismatchError, "unable to retrieve the archive id for database version '%s' and system-id '%" PRIu64 "'",
|
||||
strZ(pgVersionToStr(controlInfo.version)), controlInfo.systemId);
|
||||
}
|
||||
|
||||
if (archiveFileActual != NULL)
|
||||
{
|
||||
MEM_CONTEXT_PRIOR_BEGIN()
|
||||
{
|
||||
result.archiveFileActual = strNewFmt("%s/%s", strZ(archiveId), strZ(archiveFileActual));
|
||||
result.cipherPass = strDup(infoArchiveCipherPass(info));
|
||||
}
|
||||
MEM_CONTEXT_PRIOR_END();
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN_STRUCT(result);
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
int
|
||||
void
|
||||
archiveGetFile(
|
||||
const Storage *storage, const String *archiveFile, const String *walDestination, bool durable, CipherType cipherType,
|
||||
const String *cipherPass)
|
||||
const String *cipherPassArchive)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(STORAGE, storage);
|
||||
@ -115,15 +28,12 @@ archiveGetFile(
|
||||
FUNCTION_LOG_PARAM(STRING, walDestination);
|
||||
FUNCTION_LOG_PARAM(BOOL, durable);
|
||||
FUNCTION_LOG_PARAM(ENUM, cipherType);
|
||||
FUNCTION_TEST_PARAM(STRING, cipherPass);
|
||||
FUNCTION_TEST_PARAM(STRING, cipherPassArchive);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(archiveFile != NULL);
|
||||
ASSERT(walDestination != NULL);
|
||||
|
||||
// By default result indicates WAL segment not found
|
||||
int result = 1;
|
||||
|
||||
// Is the file compressible during the copy?
|
||||
bool compressible = true;
|
||||
|
||||
@ -131,27 +41,21 @@ archiveGetFile(
|
||||
lockStopTest();
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Make sure the file exists and other checks pass
|
||||
ArchiveGetCheckResult archiveGetCheckResult = archiveGetCheck(archiveFile, cipherType, cipherPass);
|
||||
|
||||
if (archiveGetCheckResult.archiveFileActual != NULL)
|
||||
{
|
||||
StorageWrite *destination = storageNewWriteP(
|
||||
storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable,
|
||||
.noAtomic = !durable);
|
||||
storage, walDestination, .noCreatePath = true, .noSyncFile = !durable, .noSyncPath = !durable, .noAtomic = !durable);
|
||||
|
||||
// If there is a cipher then add the decrypt filter
|
||||
if (cipherType != cipherTypeNone)
|
||||
{
|
||||
ioFilterGroupAdd(
|
||||
ioWriteFilterGroup(storageWriteIo(destination)), cipherBlockNew(cipherModeDecrypt, cipherType,
|
||||
BUFSTR(archiveGetCheckResult.cipherPass), NULL));
|
||||
ioWriteFilterGroup(storageWriteIo(destination)),
|
||||
cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPassArchive), NULL));
|
||||
compressible = false;
|
||||
}
|
||||
|
||||
// If file is compressed then add the decompression filter
|
||||
CompressType compressType = compressTypeFromName(archiveGetCheckResult.archiveFileActual);
|
||||
CompressType compressType = compressTypeFromName(archiveFile);
|
||||
|
||||
if (compressType != compressTypeNone)
|
||||
{
|
||||
@ -161,16 +65,10 @@ archiveGetFile(
|
||||
|
||||
// Copy the file
|
||||
storageCopyP(
|
||||
storageNewReadP(
|
||||
storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s", strZ(archiveGetCheckResult.archiveFileActual)),
|
||||
.compressible = compressible),
|
||||
storageNewReadP(storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s", strZ(archiveFile)), .compressible = compressible),
|
||||
destination);
|
||||
|
||||
// The WAL file was found
|
||||
result = 0;
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(INT, result);
|
||||
FUNCTION_LOG_RETURN_VOID();
|
||||
}
|
||||
|
@ -12,8 +12,8 @@ Archive Get File
|
||||
Functions
|
||||
***********************************************************************************************************************************/
|
||||
// Copy a file from the archive to the specified destination
|
||||
int archiveGetFile(
|
||||
void archiveGetFile(
|
||||
const Storage *storage, const String *archiveFile, const String *walDestination, bool durable, CipherType cipherType,
|
||||
const String *cipherPass);
|
||||
const String *cipherPassArchive);
|
||||
|
||||
#endif
|
||||
|
@ -20,6 +20,7 @@ Archive Get Command
|
||||
#include "common/wait.h"
|
||||
#include "config/config.h"
|
||||
#include "config/exec.h"
|
||||
#include "info/infoArchive.h"
|
||||
#include "postgres/interface.h"
|
||||
#include "protocol/helper.h"
|
||||
#include "protocol/parallel.h"
|
||||
@ -29,10 +30,250 @@ Archive Get Command
|
||||
Constants for log messages that are used multiple times to keep them consistent
|
||||
***********************************************************************************************************************************/
|
||||
#define FOUND_IN_ARCHIVE_MSG "found %s in the archive"
|
||||
#define FOUND_IN_REPO_ARCHIVE_MSG "found %s in the repo%u archive"
|
||||
#define FOUND_IN_REPO_ARCHIVE_MSG "found %s in the repo%u:%s archive"
|
||||
#define UNABLE_TO_FIND_IN_ARCHIVE_MSG "unable to find %s in the archive"
|
||||
#define COULD_NOT_GET_FROM_REPO_ARCHIVE_MSG \
|
||||
"could not get %s from the repo%u archive (will be retried): [%d] %s"
|
||||
#define COULD_NOT_GET_FROM_REPO_ARCHIVE_MSG "could not get %s from the repo%u:%s archive (will be retried):"
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Check for a list of archive files in the repository
|
||||
***********************************************************************************************************************************/
|
||||
typedef struct ArchiveFileMap
|
||||
{
|
||||
const String *request; // Archive file requested by archive_command
|
||||
const String *actual; // Actual file in the repo (with path, checksum, ext, etc.)
|
||||
} ArchiveFileMap;
|
||||
|
||||
typedef struct ArchiveGetCheckResult
|
||||
{
|
||||
List *archiveFileMapList; // List of mapped archive files, i.e. found in the repo
|
||||
|
||||
const String *archiveId; // Repo archive id
|
||||
CipherType cipherType; // Repo cipher type
|
||||
const String *cipherPassArchive; // Repo archive cipher pass
|
||||
|
||||
const ErrorType *errorType; // Error type if there was an error
|
||||
const String *errorFile; // Error file if there was an error
|
||||
const String *errorMessage; // Error message if there was an error
|
||||
} ArchiveGetCheckResult;
|
||||
|
||||
// Helper to find a single archive file in the repository using a cache to speed up the process and minimize storageListP() calls
|
||||
typedef struct ArchiveGetFindCache
|
||||
{
|
||||
const String *path;
|
||||
const StringList *fileList;
|
||||
} ArchiveGetFindCache;
|
||||
|
||||
static bool
|
||||
archiveGetFind(
|
||||
const String *archiveFileRequest, const String *archiveId, ArchiveGetCheckResult *getCheckResult, List *cache, bool single)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(STRING, archiveFileRequest);
|
||||
FUNCTION_LOG_PARAM(STRING, archiveId);
|
||||
FUNCTION_LOG_PARAM_P(VOID, getCheckResult);
|
||||
FUNCTION_LOG_PARAM(LIST, cache);
|
||||
FUNCTION_LOG_PARAM(BOOL, single);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(archiveFileRequest != NULL);
|
||||
ASSERT(archiveId != NULL);
|
||||
ASSERT(getCheckResult != NULL);
|
||||
ASSERT(cache != NULL);
|
||||
|
||||
ArchiveFileMap archiveFileMap = {0};
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// If a WAL segment search among the possible file names
|
||||
if (walIsSegment(archiveFileRequest))
|
||||
{
|
||||
// Get the path
|
||||
const String *path = strSubN(archiveFileRequest, 0, 16);
|
||||
|
||||
// List to hold matches for the requested file
|
||||
StringList *matchList = NULL;
|
||||
|
||||
// If a single file is requested then optimize by adding a more restrictive expression to reduce network bandwidth
|
||||
if (single)
|
||||
{
|
||||
matchList = storageListP(
|
||||
storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(path)),
|
||||
.expression = strNewFmt(
|
||||
"^%s%s-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(strSubN(archiveFileRequest, 0, 24)),
|
||||
walIsPartial(archiveFileRequest) ? WAL_SEGMENT_PARTIAL_EXT : ""));
|
||||
}
|
||||
// Else multiple files will be requested so cache list results
|
||||
else
|
||||
{
|
||||
// Partial files cannot be in a list with multiple requests
|
||||
ASSERT(!walIsPartial(archiveFileRequest));
|
||||
|
||||
// If the path does not exist in the cache then fetch it
|
||||
const ArchiveGetFindCache *cachePath = lstFind(cache, &path);
|
||||
|
||||
if (cachePath == NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(lstMemContext(cache))
|
||||
{
|
||||
cachePath = lstAdd(
|
||||
cache,
|
||||
&(ArchiveGetFindCache)
|
||||
{
|
||||
.path = strDup(path),
|
||||
.fileList = storageListP(
|
||||
storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(path)),
|
||||
.expression = strNewFmt("^%s[0-F]{8}-[0-f]{40}" COMPRESS_TYPE_REGEXP "{0,1}$", strZ(path))),
|
||||
});
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
|
||||
// Get a list of all WAL segments that match
|
||||
matchList = strLstNew();
|
||||
|
||||
for (unsigned int fileIdx = 0; fileIdx < strLstSize(cachePath->fileList); fileIdx++)
|
||||
{
|
||||
if (strBeginsWith(strLstGet(cachePath->fileList, fileIdx), archiveFileRequest))
|
||||
strLstAdd(matchList, strLstGet(cachePath->fileList, fileIdx));
|
||||
}
|
||||
}
|
||||
|
||||
// If there is a single result then return it
|
||||
if (strLstSize(matchList) == 1)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
|
||||
{
|
||||
archiveFileMap.actual = strNewFmt(
|
||||
"%s/%s/%s", strZ(archiveId), strZ(path), strZ(strLstGet(matchList, 0)));
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
// Else error if there are multiple results
|
||||
else if (strLstSize(matchList) > 1)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
|
||||
{
|
||||
getCheckResult->errorType = &ArchiveDuplicateError;
|
||||
getCheckResult->errorFile = strDup(archiveFileRequest);
|
||||
getCheckResult->errorMessage = strNewFmt(
|
||||
"duplicates found in the repo%u:%s archive for WAL segment %s: %s\n"
|
||||
"HINT: are multiple primaries archiving to this stanza?",
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)), strZ(archiveId),
|
||||
strZ(archiveFileRequest), strZ(strLstJoin(strLstSort(matchList, sortOrderAsc), ", ")));
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
}
|
||||
// Else if not a WAL segment, see if it exists in the archive dir
|
||||
else if (storageExistsP(storageRepo(), strNewFmt(STORAGE_REPO_ARCHIVE "/%s/%s", strZ(archiveId), strZ(archiveFileRequest))))
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
|
||||
{
|
||||
archiveFileMap.actual = strNewFmt("%s/%s", strZ(archiveId), strZ(archiveFileRequest));
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
}
|
||||
|
||||
if (archiveFileMap.actual != NULL)
|
||||
{
|
||||
MEM_CONTEXT_BEGIN(lstMemContext(getCheckResult->archiveFileMapList))
|
||||
{
|
||||
archiveFileMap.request = strDup(archiveFileRequest);
|
||||
}
|
||||
MEM_CONTEXT_END();
|
||||
|
||||
lstAdd(getCheckResult->archiveFileMapList, &archiveFileMap);
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN(BOOL, archiveFileMap.actual != NULL);
|
||||
}
|
||||
|
||||
static ArchiveGetCheckResult
|
||||
archiveGetCheck(const StringList *archiveRequestList)
|
||||
{
|
||||
FUNCTION_LOG_BEGIN(logLevelDebug);
|
||||
FUNCTION_LOG_PARAM(STRING_LIST, archiveRequestList);
|
||||
FUNCTION_LOG_END();
|
||||
|
||||
ASSERT(archiveRequestList != NULL);
|
||||
ASSERT(strLstSize(archiveRequestList) > 0);
|
||||
|
||||
ArchiveGetCheckResult result = {.archiveFileMapList = lstNewP(sizeof(ArchiveFileMap))};
|
||||
|
||||
MEM_CONTEXT_TEMP_BEGIN()
|
||||
{
|
||||
// Get pg control info
|
||||
PgControl controlInfo = pgControlFromFile(storagePg());
|
||||
|
||||
// Get the repo storage in case it is remote and encryption settings need to be pulled down
|
||||
storageRepo();
|
||||
|
||||
result.cipherType = cipherType(cfgOptionStr(cfgOptRepoCipherType));
|
||||
|
||||
// Attempt to load the archive info file
|
||||
InfoArchive *info = infoArchiveLoadFile(
|
||||
storageRepo(), INFO_ARCHIVE_PATH_FILE_STR, result.cipherType, cfgOptionStrNull(cfgOptRepoCipherPass));
|
||||
|
||||
// Loop through the pg history and determine which archiveId to use based on the first file in the list
|
||||
bool found = false;
|
||||
const String *archiveId = NULL;
|
||||
List *cache = NULL;
|
||||
|
||||
for (unsigned int pgIdx = 0; pgIdx < infoPgDataTotal(infoArchivePg(info)); pgIdx++)
|
||||
{
|
||||
InfoPgData pgData = infoPgData(infoArchivePg(info), pgIdx);
|
||||
|
||||
// Only use the archive id if it matches the current cluster
|
||||
if (pgData.systemId == controlInfo.systemId && pgData.version == controlInfo.version)
|
||||
{
|
||||
archiveId = infoPgArchiveId(infoArchivePg(info), pgIdx);
|
||||
cache = lstNewP(sizeof(ArchiveGetFindCache), .comparator = lstComparatorStr);
|
||||
|
||||
found = archiveGetFind(
|
||||
strLstGet(archiveRequestList, 0), archiveId, &result, cache, strLstSize(archiveRequestList) == 1);
|
||||
|
||||
// If the file was found then use this archiveId for the rest of the files
|
||||
if (found)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Error if no archive id was found -- this indicates a mismatch with the current cluster
|
||||
if (archiveId == NULL)
|
||||
{
|
||||
THROW_FMT(
|
||||
ArchiveMismatchError, "unable to retrieve the archive id for database version '%s' and system-id '%" PRIu64 "'",
|
||||
strZ(pgVersionToStr(controlInfo.version)), controlInfo.systemId);
|
||||
}
|
||||
|
||||
// Copy repo data to result if the first file was found or on error
|
||||
if (found || result.errorType != NULL)
|
||||
{
|
||||
MEM_CONTEXT_PRIOR_BEGIN()
|
||||
{
|
||||
result.archiveId = strDup(archiveId);
|
||||
result.cipherPassArchive = strDup(infoArchiveCipherPass(info));
|
||||
}
|
||||
MEM_CONTEXT_PRIOR_END();
|
||||
}
|
||||
|
||||
// Continue only if the first file was found
|
||||
if (found)
|
||||
{
|
||||
// Find the rest of the files in the list
|
||||
for (unsigned int archiveRequestIdx = 1; archiveRequestIdx < strLstSize(archiveRequestList); archiveRequestIdx++)
|
||||
{
|
||||
if (!archiveGetFind(strLstGet(archiveRequestList, archiveRequestIdx), archiveId, &result, cache, false))
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
MEM_CONTEXT_TEMP_END();
|
||||
|
||||
FUNCTION_LOG_RETURN_STRUCT(result);
|
||||
}
|
||||
|
||||
/***********************************************************************************************************************************
|
||||
Clean the queue and prepare a list of WAL segments that the async process should get
|
||||
@ -272,20 +513,31 @@ cmdArchiveGet(void)
|
||||
// Else perform synchronous get
|
||||
else
|
||||
{
|
||||
// Get the repo storage in case it is remote and encryption settings need to be pulled down
|
||||
storageRepo();
|
||||
// Check for the archive file
|
||||
StringList *archiveRequestList = strLstNew();
|
||||
strLstAdd(archiveRequestList, walSegment);
|
||||
|
||||
ArchiveGetCheckResult checkResult = archiveGetCheck(archiveRequestList);
|
||||
|
||||
// If there was an error then throw it
|
||||
if (checkResult.errorType != NULL)
|
||||
THROW_CODE(errorTypeCode(checkResult.errorType), strZ(checkResult.errorMessage));
|
||||
|
||||
// Get the archive file
|
||||
result = archiveGetFile(
|
||||
storageLocalWrite(), walSegment, walDestination, false, cipherType(cfgOptionStr(cfgOptRepoCipherType)),
|
||||
cfgOptionStrNull(cfgOptRepoCipherPass));
|
||||
|
||||
// Log that the file was found
|
||||
if (result == 0)
|
||||
if (lstSize(checkResult.archiveFileMapList) > 0)
|
||||
{
|
||||
ASSERT(lstSize(checkResult.archiveFileMapList) == 1);
|
||||
|
||||
archiveGetFile(
|
||||
storageLocalWrite(), ((ArchiveFileMap *)lstGet(checkResult.archiveFileMapList, 0))->actual, walDestination,
|
||||
false, checkResult.cipherType, checkResult.cipherPassArchive);
|
||||
|
||||
// If there was no error then the file existed
|
||||
LOG_INFO_FMT(
|
||||
FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment),
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)));
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)), strZ(checkResult.archiveId));
|
||||
|
||||
result = 0;
|
||||
}
|
||||
// Else log that the file was not found
|
||||
else
|
||||
@ -298,12 +550,6 @@ cmdArchiveGet(void)
|
||||
}
|
||||
|
||||
/**********************************************************************************************************************************/
|
||||
typedef struct ArchiveGetAsyncData
|
||||
{
|
||||
const StringList *walSegmentList; // List of wal segments to process
|
||||
unsigned int walSegmentIdx; // Current index in the list to be processed
|
||||
} ArchiveGetAsyncData;
|
||||
|
||||
static ProtocolParallelJob *archiveGetAsyncCallback(void *data, unsigned int clientIdx)
|
||||
{
|
||||
FUNCTION_TEST_BEGIN();
|
||||
@ -315,17 +561,20 @@ static ProtocolParallelJob *archiveGetAsyncCallback(void *data, unsigned int cli
|
||||
(void)clientIdx;
|
||||
|
||||
// Get a new job if there are any left
|
||||
ArchiveGetAsyncData *jobData = data;
|
||||
ArchiveGetCheckResult *checkResult = data;
|
||||
|
||||
if (jobData->walSegmentIdx < strLstSize(jobData->walSegmentList))
|
||||
if (lstSize(checkResult->archiveFileMapList) > 0)
|
||||
{
|
||||
const String *walSegment = strLstGet(jobData->walSegmentList, jobData->walSegmentIdx);
|
||||
jobData->walSegmentIdx++;
|
||||
const ArchiveFileMap archiveFileMap = *((ArchiveFileMap *)lstGet(checkResult->archiveFileMapList, 0));
|
||||
lstRemoveIdx(checkResult->archiveFileMapList, 0);
|
||||
|
||||
ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_ARCHIVE_GET_STR);
|
||||
protocolCommandParamAdd(command, VARSTR(walSegment));
|
||||
protocolCommandParamAdd(command, VARSTR(archiveFileMap.request));
|
||||
protocolCommandParamAdd(command, VARSTR(archiveFileMap.actual));
|
||||
protocolCommandParamAdd(command, VARUINT(checkResult->cipherType));
|
||||
protocolCommandParamAdd(command, VARSTR(checkResult->cipherPassArchive));
|
||||
|
||||
FUNCTION_TEST_RETURN(protocolParallelJobNew(VARSTR(walSegment), command));
|
||||
FUNCTION_TEST_RETURN(protocolParallelJobNew(VARSTR(archiveFileMap.request), command));
|
||||
}
|
||||
|
||||
FUNCTION_TEST_RETURN(NULL);
|
||||
@ -344,21 +593,30 @@ cmdArchiveGetAsync(void)
|
||||
pgIsLocalVerify();
|
||||
|
||||
// Check the parameters
|
||||
ArchiveGetAsyncData jobData = {.walSegmentList = cfgCommandParam()};
|
||||
|
||||
if (strLstSize(jobData.walSegmentList) < 1)
|
||||
if (strLstSize(cfgCommandParam()) < 1)
|
||||
THROW(ParamInvalidError, "at least one wal segment is required");
|
||||
|
||||
LOG_INFO_FMT(
|
||||
"get %u WAL file(s) from archive: %s%s",
|
||||
strLstSize(jobData.walSegmentList), strZ(strLstGet(jobData.walSegmentList, 0)),
|
||||
strLstSize(jobData.walSegmentList) == 1 ?
|
||||
"" :
|
||||
strZ(strNewFmt("...%s", strZ(strLstGet(jobData.walSegmentList, strLstSize(jobData.walSegmentList) - 1)))));
|
||||
strLstSize(cfgCommandParam()), strZ(strLstGet(cfgCommandParam(), 0)),
|
||||
strLstSize(cfgCommandParam()) == 1 ?
|
||||
"" : strZ(strNewFmt("...%s", strZ(strLstGet(cfgCommandParam(), strLstSize(cfgCommandParam()) - 1)))));
|
||||
|
||||
// Check for archive files
|
||||
ArchiveGetCheckResult checkResult = archiveGetCheck(cfgCommandParam());
|
||||
|
||||
// If any files are missing get the first one (used to construct the "unable to find" warning)
|
||||
const String *archiveFileMissing = NULL;
|
||||
|
||||
if (lstSize(checkResult.archiveFileMapList) < strLstSize(cfgCommandParam()))
|
||||
archiveFileMissing = strLstGet(cfgCommandParam(), lstSize(checkResult.archiveFileMapList));
|
||||
|
||||
// Get archive files that were found
|
||||
if (lstSize(checkResult.archiveFileMapList) > 0)
|
||||
{
|
||||
// Create the parallel executor
|
||||
ProtocolParallel *parallelExec = protocolParallelNew(
|
||||
cfgOptionUInt64(cfgOptProtocolTimeout) / 2, archiveGetAsyncCallback, &jobData);
|
||||
cfgOptionUInt64(cfgOptProtocolTimeout) / 2, archiveGetAsyncCallback, &checkResult);
|
||||
|
||||
for (unsigned int processIdx = 1; processIdx <= cfgOptionUInt(cfgOptProcessMax); processIdx++)
|
||||
protocolParallelClientAdd(parallelExec, protocolLocalGet(protocolStorageTypeRepo, 0, processIdx));
|
||||
@ -377,30 +635,22 @@ cmdArchiveGetAsync(void)
|
||||
|
||||
// The job was successful
|
||||
if (protocolParallelJobErrorCode(job) == 0)
|
||||
{
|
||||
// Get the archive file
|
||||
if (varIntForce(protocolParallelJobResult(job)) == 0)
|
||||
{
|
||||
LOG_DETAIL_PID_FMT(
|
||||
processId,
|
||||
FOUND_IN_REPO_ARCHIVE_MSG, strZ(walSegment),
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)));
|
||||
}
|
||||
// If it does not exist write an ok file to indicate that it was checked
|
||||
else
|
||||
{
|
||||
LOG_DETAIL_PID_FMT(processId, UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(walSegment));
|
||||
archiveAsyncStatusOkWrite(archiveModeGet, walSegment, NULL);
|
||||
}
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)),
|
||||
strZ(checkResult.archiveId));
|
||||
}
|
||||
// Else the job errored
|
||||
else
|
||||
{
|
||||
LOG_WARN_PID_FMT(
|
||||
processId,
|
||||
COULD_NOT_GET_FROM_REPO_ARCHIVE_MSG, strZ(walSegment),
|
||||
COULD_NOT_GET_FROM_REPO_ARCHIVE_MSG " [%d] %s", strZ(walSegment),
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)),
|
||||
protocolParallelJobErrorCode(job), strZ(protocolParallelJobErrorMessage(job)));
|
||||
strZ(checkResult.archiveId), protocolParallelJobErrorCode(job),
|
||||
strZ(protocolParallelJobErrorMessage(job)));
|
||||
|
||||
archiveAsyncStatusErrorWrite(
|
||||
archiveModeGet, walSegment, protocolParallelJobErrorCode(job),
|
||||
@ -412,6 +662,26 @@ cmdArchiveGetAsync(void)
|
||||
}
|
||||
while (!protocolParallelDone(parallelExec));
|
||||
}
|
||||
|
||||
// Log an error from archiveGetCheck() after any existing files have been fetched. This ordering is important because we
|
||||
// need to fetch as many valid files as possible before throwing an error.
|
||||
if (checkResult.errorType != NULL)
|
||||
{
|
||||
LOG_WARN_FMT(
|
||||
COULD_NOT_GET_FROM_REPO_ARCHIVE_MSG " [%d] %s", strZ(checkResult.errorFile),
|
||||
cfgOptionGroupIdxToKey(cfgOptGrpRepo, cfgOptionGroupIdxDefault(cfgOptGrpRepo)),
|
||||
strZ(checkResult.archiveId), errorTypeCode(checkResult.errorType), strZ(checkResult.errorMessage));
|
||||
|
||||
archiveAsyncStatusErrorWrite(
|
||||
archiveModeGet, checkResult.errorFile, errorTypeCode(checkResult.errorType), checkResult.errorMessage);
|
||||
}
|
||||
// Else log a warning if any files were missing
|
||||
else if (archiveFileMissing != NULL)
|
||||
{
|
||||
LOG_DETAIL_FMT(UNABLE_TO_FIND_IN_ARCHIVE_MSG, strZ(archiveFileMissing));
|
||||
archiveAsyncStatusOkWrite(archiveModeGet, archiveFileMissing, NULL);
|
||||
}
|
||||
}
|
||||
// On any global error write a single error file to cover all unprocessed files
|
||||
CATCH_ANY()
|
||||
{
|
||||
|
@ -36,17 +36,16 @@ archiveGetProtocol(const String *command, const VariantList *paramList, Protocol
|
||||
{
|
||||
if (strEq(command, PROTOCOL_COMMAND_ARCHIVE_GET_STR))
|
||||
{
|
||||
const String *walSegment = varStr(varLstGet(paramList, 0));
|
||||
const String *archiveFileRequest = varStr(varLstGet(paramList, 0));
|
||||
const String *archiveFileActual = varStr(varLstGet(paramList, 1));
|
||||
const CipherType cipherType = (CipherType)varUIntForce(varLstGet(paramList, 2));
|
||||
const String *cipherPassArchive = varStr(varLstGet(paramList, 3));
|
||||
|
||||
// Get the repo storage in case it is remote and encryption settings need to be pulled down
|
||||
storageRepo();
|
||||
|
||||
protocolServerResponse(
|
||||
server,
|
||||
VARINT(
|
||||
archiveGetFile(
|
||||
storageSpoolWrite(), walSegment, strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(walSegment)), true,
|
||||
cipherType(cfgOptionStr(cfgOptRepoCipherType)), cfgOptionStrNull(cfgOptRepoCipherPass))));
|
||||
storageSpoolWrite(), archiveFileActual, strNewFmt(STORAGE_SPOOL_ARCHIVE_IN "/%s", strZ(archiveFileRequest)), true,
|
||||
cipherType, cipherPassArchive);
|
||||
|
||||
protocolServerResponse(server, NULL);
|
||||
}
|
||||
else
|
||||
found = false;
|
||||
|
@ -565,7 +565,7 @@ unit:
|
||||
|
||||
# ----------------------------------------------------------------------------------------------------------------------------
|
||||
- name: archive-get
|
||||
total: 4
|
||||
total: 3
|
||||
binReq: true
|
||||
|
||||
coverage:
|
||||
|
@ -88,7 +88,7 @@ P00 INFO: archive-get command end: completed successfully
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get 000000010000000100000001 [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [000000010000000100000001, [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG] --buffer-size=[BUFFER-SIZE] --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-cipher-pass=<redacted> --repo1-cipher-type=aes-256-cbc --repo1-path=[TEST_PATH]/db-primary/repo --stanza=db
|
||||
P00 INFO: found 000000010000000100000001 in the repo1 archive
|
||||
P00 INFO: found 000000010000000100000001 in the repo1:9.4-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-push --compress-type=lz4 --archive-async --process-max=2 [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000002
|
||||
@ -175,7 +175,7 @@ P00 INFO: archive-get command end: completed successfully
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get --archive-async 00000002.history [TEST_PATH]/db-primary/db/base/pg_xlog/00000002.history
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [00000002.history, [TEST_PATH]/db-primary/db/base/pg_xlog/00000002.history] --archive-async --buffer-size=[BUFFER-SIZE] --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-cipher-pass=<redacted> --repo1-cipher-type=aes-256-cbc --repo1-path=[TEST_PATH]/db-primary/repo --spool-path=[TEST_PATH]/db-primary/spool --stanza=db
|
||||
P00 INFO: found 00000002.history in the repo1 archive
|
||||
P00 INFO: found 00000002.history in the repo1:9.4-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-push [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000002.partial
|
||||
|
@ -83,7 +83,7 @@ P00 INFO: archive-get command end: completed successfully
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get 000000010000000100000001 [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [000000010000000100000001, [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG] --buffer-size=[BUFFER-SIZE] --compress-level-network=1 --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-host=backup --repo1-host-cmd=[BACKREST-BIN] --repo1-host-config=[TEST_PATH]/backup/pgbackrest.conf --repo1-host-user=[USER-1] --stanza=db
|
||||
P00 INFO: found 000000010000000100000001 in the repo1 archive
|
||||
P00 INFO: found 000000010000000100000001 in the repo1:9.4-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-push --compress-type=zst --archive-async --process-max=2 [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000002
|
||||
@ -170,7 +170,7 @@ P00 INFO: archive-get command end: completed successfully
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get --archive-async 00000002.history [TEST_PATH]/db-primary/db/base/pg_xlog/00000002.history
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [00000002.history, [TEST_PATH]/db-primary/db/base/pg_xlog/00000002.history] --archive-async --buffer-size=[BUFFER-SIZE] --compress-level-network=1 --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-host=backup --repo1-host-cmd=[BACKREST-BIN] --repo1-host-config=[TEST_PATH]/backup/pgbackrest.conf --repo1-host-user=[USER-1] --spool-path=[TEST_PATH]/db-primary/spool --stanza=db
|
||||
P00 INFO: found 00000002.history in the repo1 archive
|
||||
P00 INFO: found 00000002.history in the repo1:9.4-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-push [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000002.partial
|
||||
|
@ -286,7 +286,7 @@ backrest-checksum="[CHECKSUM]"
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get 000000010000000100000002 [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [000000010000000100000002, [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG] --buffer-size=[BUFFER-SIZE] --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-path=[TEST_PATH]/db-primary/repo --stanza=db
|
||||
P00 INFO: found 000000010000000100000002 in the repo1 archive
|
||||
P00 INFO: found 000000010000000100000002 in the repo1:9.3-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --log-level-console=warn --archive-push-queue-max=33554432 --stanza=db archive-push [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000001
|
||||
|
@ -296,7 +296,7 @@ backrest-checksum="[CHECKSUM]"
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --stanza=db archive-get 000000010000000100000002 [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG
|
||||
------------------------------------------------------------------------------------------------------------------------------------
|
||||
P00 INFO: archive-get command begin [BACKREST-VERSION]: [000000010000000100000002, [TEST_PATH]/db-primary/db/base/pg_xlog/RECOVERYXLOG] --buffer-size=[BUFFER-SIZE] --compress-level-network=1 --config=[TEST_PATH]/db-primary/pgbackrest.conf --db-timeout=45 --exec-id=[EXEC-ID] --job-retry=0 --lock-path=[TEST_PATH]/db-primary/lock --log-level-console=detail --log-level-file=[LOG-LEVEL-FILE] --log-level-stderr=off --log-path=[TEST_PATH]/db-primary/log[] --no-log-timestamp --pg1-path=[TEST_PATH]/db-primary/db/base --protocol-timeout=60 --repo1-host=backup --repo1-host-cmd=[BACKREST-BIN] --repo1-host-config=[TEST_PATH]/backup/pgbackrest.conf --repo1-host-user=[USER-1] --stanza=db
|
||||
P00 INFO: found 000000010000000100000002 in the repo1 archive
|
||||
P00 INFO: found 000000010000000100000002 in the repo1:9.3-1 archive
|
||||
P00 INFO: archive-get command end: completed successfully
|
||||
|
||||
> [CONTAINER-EXEC] db-primary [BACKREST-BIN] --config=[TEST_PATH]/db-primary/pgbackrest.conf --log-level-console=warn --archive-push-queue-max=33554432 --stanza=db archive-push [TEST_PATH]/db-primary/db/base/pg_xlog/000000010000000100000001
|
||||
|
@ -23,91 +23,6 @@ testRun(void)
|
||||
|
||||
Storage *storageTest = storagePosixNewP(strNew(testPath()), .write = true);
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
if (testBegin("archiveGetCheck()"))
|
||||
{
|
||||
// Load Parameters
|
||||
StringList *argList = strLstNew();
|
||||
strLstAddZ(argList, "--stanza=test1");
|
||||
strLstAdd(argList, strNewFmt("--repo1-path=%s/repo", testPath()));
|
||||
strLstAdd(argList, strNewFmt("--pg1-path=%s/pg", testPath()));
|
||||
harnessCfgLoad(cfgCmdArchiveGet, argList);
|
||||
|
||||
// Create pg_control file
|
||||
storagePutP(
|
||||
storageNewWriteP(storageTest, strNew("pg/" PG_PATH_GLOBAL "/" PG_FILE_PGCONTROL)),
|
||||
pgControlTestToBuffer((PgControl){.version = PG_VERSION_10, .systemId = 0xFACEFACEFACEFACE}));
|
||||
|
||||
// Control and archive info mismatch
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutP(
|
||||
storageNewWriteP(storageTest, strNew("repo/archive/test1/archive.info")),
|
||||
harnessInfoChecksumZ(
|
||||
"[db]\n"
|
||||
"db-id=1\n"
|
||||
"\n"
|
||||
"[db:history]\n"
|
||||
"1={\"db-id\":5555555555555555555,\"db-version\":\"9.4\"}\n"));
|
||||
|
||||
TEST_ERROR(
|
||||
archiveGetCheck(strNew("876543218765432187654321"), cipherTypeNone, NULL), ArchiveMismatchError,
|
||||
"unable to retrieve the archive id for database version '10' and system-id '18072658121562454734'");
|
||||
|
||||
// Nothing to find in empty archive dir
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutP(
|
||||
storageNewWriteP(storageTest, strNew("repo/archive/test1/archive.info")),
|
||||
harnessInfoChecksumZ(
|
||||
"[db]\n"
|
||||
"db-id=3\n"
|
||||
"\n"
|
||||
"[db:history]\n"
|
||||
"1={\"db-id\":5555555555555555555,\"db-version\":\"9.4\"}\n"
|
||||
"2={\"db-id\":18072658121562454734,\"db-version\":\"10\"}\n"
|
||||
"3={\"db-id\":18072658121562454734,\"db-version\":\"9.6\"}\n"
|
||||
"4={\"db-id\":18072658121562454734,\"db-version\":\"10\"}"));
|
||||
|
||||
TEST_RESULT_STR(
|
||||
archiveGetCheck(strNew("876543218765432187654321"), cipherTypeNone, NULL).archiveFileActual, NULL, "no segment found");
|
||||
|
||||
// Write segment into an older archive path
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutP(
|
||||
storageNewWriteP(
|
||||
storageTest,
|
||||
strNew(
|
||||
"repo/archive/test1/10-2/8765432187654321/876543218765432187654321-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")),
|
||||
NULL);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
archiveGetCheck(strNew("876543218765432187654321"), cipherTypeNone, NULL).archiveFileActual,
|
||||
"10-2/8765432187654321/876543218765432187654321-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "segment found");
|
||||
|
||||
// Write segment into an newer archive path
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
storagePutP(
|
||||
storageNewWriteP(
|
||||
storageTest,
|
||||
strNew(
|
||||
"repo/archive/test1/10-4/8765432187654321/876543218765432187654321-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")),
|
||||
NULL);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
archiveGetCheck(strNew("876543218765432187654321"), cipherTypeNone, NULL).archiveFileActual,
|
||||
"10-4/8765432187654321/876543218765432187654321-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "newer segment found");
|
||||
|
||||
// Get history file
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_RESULT_STR(
|
||||
archiveGetCheck(strNew("00000009.history"), cipherTypeNone, NULL).archiveFileActual, NULL, "history file not found");
|
||||
|
||||
storagePutP(storageNewWriteP(storageTest, strNew("repo/archive/test1/10-4/00000009.history")), NULL);
|
||||
|
||||
TEST_RESULT_STR_Z(
|
||||
archiveGetCheck(strNew("00000009.history"), cipherTypeNone, NULL).archiveFileActual, "10-4/00000009.history",
|
||||
"history file found");
|
||||
}
|
||||
|
||||
// *****************************************************************************************************************************
|
||||
if (testBegin("queueNeed()"))
|
||||
{
|
||||
@ -224,7 +139,7 @@ testRun(void)
|
||||
|
||||
harnessLogResult(
|
||||
"P00 INFO: get 1 WAL file(s) from archive: 000000010000000100000001\n"
|
||||
"P01 DETAIL: unable to find 000000010000000100000001 in the archive");
|
||||
"P00 DETAIL: unable to find 000000010000000100000001 in the archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN, "000000010000000100000001.ok\n", .remove = true);
|
||||
|
||||
@ -238,7 +153,7 @@ testRun(void)
|
||||
|
||||
harnessLogResult(
|
||||
"P00 INFO: get 1 WAL file(s) from archive: 000000010000000100000001\n"
|
||||
"P01 WARN: could not get 000000010000000100000001 from the repo1 archive (will be retried):"
|
||||
"P01 WARN: could not get 000000010000000100000001 from the repo1:10-1 archive (will be retried):"
|
||||
" [29] raised from local-1 protocol: unexpected eof in compressed data");
|
||||
|
||||
TEST_STORAGE_LIST(
|
||||
@ -261,7 +176,7 @@ testRun(void)
|
||||
|
||||
harnessLogResult(
|
||||
"P00 INFO: get 1 WAL file(s) from archive: 000000010000000100000001\n"
|
||||
"P01 DETAIL: found 000000010000000100000001 in the repo1 archive");
|
||||
"P01 DETAIL: found 000000010000000100000001 in the repo1:10-1 archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN, "000000010000000100000001\n", .remove = true);
|
||||
|
||||
@ -287,18 +202,34 @@ testRun(void)
|
||||
|
||||
harnessLogResult(
|
||||
"P00 INFO: get 3 WAL file(s) from archive: 0000000100000001000000FE...000000010000000200000000\n"
|
||||
"P01 DETAIL: found 0000000100000001000000FE in the repo1 archive\n"
|
||||
"P01 DETAIL: unable to find 0000000100000001000000FF in the archive\n"
|
||||
"P01 WARN: could not get 000000010000000200000000 from the repo1 archive (will be retried): "
|
||||
"[45] raised from local-1 protocol: duplicates found in archive for WAL segment 000000010000000200000000: "
|
||||
"P01 DETAIL: found 0000000100000001000000FE in the repo1:10-1 archive\n"
|
||||
"P00 DETAIL: unable to find 0000000100000001000000FF in the archive");
|
||||
|
||||
TEST_STORAGE_LIST(
|
||||
storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN, "0000000100000001000000FE\n0000000100000001000000FF.ok\n",
|
||||
.remove = true);
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("error on duplicates now that no segments are missing");
|
||||
|
||||
HRN_STORAGE_PUT_EMPTY(
|
||||
storageRepoWrite(), STORAGE_REPO_ARCHIVE "/10-1/0000000100000001000000FF-efefefefefefefefefefefefefefefefefefefef");
|
||||
|
||||
TEST_RESULT_VOID(cmdArchiveGetAsync(), "archive async");
|
||||
|
||||
harnessLogResult(
|
||||
"P00 INFO: get 3 WAL file(s) from archive: 0000000100000001000000FE...000000010000000200000000\n"
|
||||
"P01 DETAIL: found 0000000100000001000000FE in the repo1:10-1 archive\n"
|
||||
"P01 DETAIL: found 0000000100000001000000FF in the repo1:10-1 archive\n"
|
||||
"P00 WARN: could not get 000000010000000200000000 from the repo1:10-1 archive (will be retried): "
|
||||
"[45] duplicates found in the repo1:10-1 archive for WAL segment 000000010000000200000000: "
|
||||
"000000010000000200000000-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, "
|
||||
"000000010000000200000000-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\n"
|
||||
" HINT: are multiple primaries archiving to this stanza?");
|
||||
|
||||
TEST_STORAGE_LIST(
|
||||
storageSpoolWrite(), STORAGE_SPOOL_ARCHIVE_IN,
|
||||
"0000000100000001000000FE\n0000000100000001000000FF.ok\n000000010000000200000000.error\n",
|
||||
.remove = true);
|
||||
"0000000100000001000000FE\n0000000100000001000000FF\n000000010000000200000000.error\n", .remove = true);
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("global error on invalid executable");
|
||||
@ -551,7 +482,7 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1 archive");
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1:10-1 archive");
|
||||
|
||||
TEST_RESULT_UINT(
|
||||
storageInfoP(storageTest, STRDEF(TEST_PATH_PG "/pg_wal/RECOVERYXLOG")).size, 16 * 1024 * 1024, "check size");
|
||||
@ -565,7 +496,7 @@ testRun(void)
|
||||
|
||||
TEST_ERROR(
|
||||
cmdArchiveGet(), ArchiveDuplicateError,
|
||||
"duplicates found in archive for WAL segment 01ABCDEF01ABCDEF01ABCDEF:"
|
||||
"duplicates found in the repo1:10-1 archive for WAL segment 01ABCDEF01ABCDEF01ABCDEF:"
|
||||
" 01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,"
|
||||
" 01ABCDEF01ABCDEF01ABCDEF-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb\n"
|
||||
"HINT: are multiple primaries archiving to this stanza?");
|
||||
@ -589,11 +520,25 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1 archive");
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1:10-1 archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageTest, TEST_PATH_PG "/pg_wal", "RECOVERYXLOG\n", .remove = true);
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get from current db-id");
|
||||
|
||||
HRN_STORAGE_PUT_EMPTY(
|
||||
storageRepoWrite(), STORAGE_REPO_ARCHIVE "/10-3/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1:10-3 archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageTest, TEST_PATH_PG "/pg_wal", "RECOVERYXLOG\n", .remove = true);
|
||||
TEST_STORAGE_REMOVE(
|
||||
storageRepoWrite(), STORAGE_REPO_ARCHIVE "/10-1/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
|
||||
TEST_STORAGE_REMOVE(
|
||||
storageRepoWrite(), STORAGE_REPO_ARCHIVE "/10-3/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
TEST_TITLE("get partial");
|
||||
@ -614,7 +559,7 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 000000010000000100000001.partial in the repo1 archive");
|
||||
harnessLogResult("P00 INFO: found 000000010000000100000001.partial in the repo1:10-3 archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageTest, TEST_PATH_PG "/pg_wal", "RECOVERYXLOG\n", .remove = true);
|
||||
TEST_STORAGE_REMOVE(
|
||||
@ -642,7 +587,7 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 00000001.history in the repo1 archive");
|
||||
harnessLogResult("P00 INFO: found 00000001.history in the repo1:10-1 archive");
|
||||
|
||||
TEST_RESULT_UINT(storageInfoP(storageTest, STRDEF(TEST_PATH_PG "/pg_wal/RECOVERYHISTORY")).size, 7, "check size");
|
||||
TEST_STORAGE_LIST(storageTest, TEST_PATH_PG "/pg_wal", "RECOVERYHISTORY\n", .remove = true);
|
||||
@ -677,7 +622,7 @@ testRun(void)
|
||||
|
||||
TEST_RESULT_INT(cmdArchiveGet(), 0, "get");
|
||||
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1 archive");
|
||||
harnessLogResult("P00 INFO: found 01ABCDEF01ABCDEF01ABCDEF in the repo1:10-1 archive");
|
||||
|
||||
TEST_STORAGE_LIST(storageTest, TEST_PATH_PG "/pg_wal", "RECOVERYXLOG\n");
|
||||
TEST_RESULT_UINT(
|
||||
@ -706,11 +651,15 @@ testRun(void)
|
||||
// Setup protocol command
|
||||
VariantList *paramList = varLstNew();
|
||||
varLstAdd(paramList, varNewStrZ("01ABCDEF01ABCDEF01ABCDEF"));
|
||||
varLstAdd(
|
||||
paramList, varNewStrZ("10-1/01ABCDEF01ABCDEF/01ABCDEF01ABCDEF01ABCDEF-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.gz"));
|
||||
varLstAdd(paramList, varNewUInt(cipherTypeAes256Cbc));
|
||||
varLstAdd(paramList, varNewStrZ(TEST_CIPHER_PASS_ARCHIVE));
|
||||
|
||||
TEST_RESULT_BOOL(
|
||||
archiveGetProtocol(PROTOCOL_COMMAND_ARCHIVE_GET_STR, paramList, server), true, "protocol archive get");
|
||||
|
||||
TEST_RESULT_STR_Z(strNewBuf(serverWrite), "{\"out\":0}\n", "check result");
|
||||
TEST_RESULT_STR_Z(strNewBuf(serverWrite), "{}\n", "check result");
|
||||
TEST_STORAGE_LIST(storageSpool(), STORAGE_SPOOL_ARCHIVE_IN, "000000010000000100000002\n01ABCDEF01ABCDEF01ABCDEF\n");
|
||||
|
||||
bufUsedSet(serverWrite, 0);
|
||||
|
Reference in New Issue
Block a user