1
0
mirror of https://github.com/pgbackrest/pgbackrest.git synced 2025-03-05 15:05:48 +02:00

More balanced backup.

This commit is contained in:
David Steele 2014-02-12 23:49:40 -05:00
parent b770c8ebec
commit 7c4d463b1d

View File

@ -405,8 +405,10 @@ sub backup_file
# Hash table used to store files for parallel copy
my $lTablespaceIdx = 0;
my $lFileTotal = 0;
my $lFileZeroTotal = 0;
my $lFileSizeTotal = 0;
my $lFileLargeSize = 0;
my $lFileLargeTotal = 0;
my $lFileSmallSize = 0;
my $lFileSmallTotal = 0;
# Iterate through the path sections of the manifest to backup
my $strSectionPath;
@ -506,8 +508,10 @@ sub backup_file
my $lFileSize = ${$oBackupManifestRef}{"${strSectionFile}"}{"$strFile"}{size};
$lFileTotal++;
$lFileSizeTotal += $lFileSize;
$lFileZeroTotal += $lFileSize == 0 ? 1 : 0;
$lFileLargeSize += $lFileSize > 8192 ? $lFileSize : 0;
$lFileLargeTotal += $lFileSize > 8192 ? 1 : 0;
$lFileSmallSize += $lFileSize <= 8192 ? $lFileSize : 0;
$lFileSmallTotal += $lFileSize <= 8192 ? 1 : 0;
my $strKey = sprintf("ts%012x-fs%012x-fn%012x", $lTablespaceIdx,
$lFileSize, $lFileTotal);
@ -522,51 +526,79 @@ sub backup_file
}
# Build the thread queues
my $iThreadIdx;
my @oThread;
for ($iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
{
$oThreadFile[$iThreadIdx] = $oFile->clone($iThreadIdx);
$oThreadQueue[$iThreadIdx] = Thread::Queue->new();
}
# Assign files to each thread queue
$iThreadIdx = 0;
my $fThreadFileSizeMax = $lFileSizeTotal / $iThreadTotal;
my $fThreadFileSize = 0;
my $iThreadFileTotal = 0;
my $iThreadFileSmallIdx = 0;
my $iThreadFileSmallTotalMax = $lFileSmallTotal / $iThreadTotal;
my $fThreadFileSmallSize = 0;
my $iThreadFileSmallTotal = 0;
my $iThreadFileLargeIdx = 0;
my $fThreadFileLargeSizeMax = $lFileLargeSize / $iThreadTotal;
my $fThreadFileLargeSize = 0;
my $iThreadFileLargeTotal = 0;
&log(DEBUG, " files ${lFileTotal}, zero files ${lFileZeroTotal}, file size: ${lFileSizeTotal}, per thread ${fThreadFileSizeMax}");
&log(DEBUG, " files ${lFileTotal}, " .
"small total ${lFileSmallTotal}, small size: ${lFileSmallSize}, small thread avg total ${iThreadFileSmallTotalMax} " .
"large total ${lFileLargeTotal}, large size: ${lFileLargeSize}, large thread avg size ${fThreadFileLargeSizeMax}");
foreach my $strFile (sort {$b cmp $a} (keys %oFileCopyMap))
{
$oThreadQueue[$iThreadIdx]->enqueue($strFile);
$fThreadFileSize += $oFileCopyMap{"${strFile}"}{size};
$iThreadFileTotal++;
if ($fThreadFileSize >= $fThreadFileSizeMax && $iThreadIdx < $iThreadTotal - 1)
{
&log(DEBUG, " thread ${iThreadIdx} files ${iThreadFileTotal} size ${fThreadFileSize}");
my $lFileSize = $oFileCopyMap{"${strFile}"}{size};
$iThreadIdx++;
$iThreadFileTotal = 0;
$fThreadFileSize = 0;
if ($lFileSize > 8192)
{
$oThreadQueue[$iThreadFileLargeIdx]->enqueue($strFile);
$fThreadFileLargeSize += $lFileSize;
$iThreadFileLargeTotal++;
if ($fThreadFileLargeSize >= $fThreadFileLargeSizeMax && $iThreadFileLargeIdx < $iThreadTotal - 1)
{
&log(DEBUG, " thread ${iThreadFileLargeIdx} large total ${iThreadFileLargeTotal}, size ${fThreadFileLargeSize}");
$iThreadFileLargeIdx++;
$fThreadFileLargeSize = 0;
$iThreadFileLargeTotal = 0;
}
}
else
{
$oThreadQueue[$iThreadFileSmallIdx]->enqueue($strFile);
$fThreadFileSmallSize += $lFileSize;
$iThreadFileSmallTotal++;
if ($iThreadFileSmallTotal >= $iThreadFileSmallTotalMax && $iThreadFileSmallIdx < $iThreadTotal - 1)
{
&log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}");
$iThreadFileSmallIdx++;
$fThreadFileSmallSize = 0;
$iThreadFileSmallTotal = 0;
}
}
}
&log(DEBUG, " thread ${iThreadIdx} files ${iThreadFileTotal} size ${fThreadFileSize}");
&log(DEBUG, " thread ${iThreadFileLargeIdx} large total ${iThreadFileLargeTotal}, size ${fThreadFileLargeSize}");
&log(DEBUG, " thread ${iThreadFileSmallIdx} small total ${iThreadFileSmallTotal}, size ${fThreadFileSmallSize}");
# End each thread queue and start the thread
for ($iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
{
$oThreadQueue[$iThreadIdx]->enqueue(undef);
$oThread[$iThreadIdx] = threads->create(\&backup_file_thread, $iThreadIdx, $bNoChecksum);
}
# Rejoin the threads
for ($iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
for (my $iThreadIdx = 0; $iThreadIdx < $iThreadTotal; $iThreadIdx++)
{
$oThread[$iThreadIdx]->join();
}