mirror of
https://github.com/rclone/rclone.git
synced 2025-10-06 05:47:10 +02:00
accounting: add SetMaxCompletedTransfers method to fix bisync race #8815
Before this change bisync adjusted the global MaxCompletedTransfers variable which caused races. This adds a SetMaxCompletedTransfers method and uses it in bisync. The MaxCompletedTransfers global becomes the default. This can be changed externally if rclone is in use as a library, and the commit history indicates that MaxCompletedTransfers was added for exactly this purpose so we try not to break it here.
This commit is contained in:
@@ -245,10 +245,8 @@ func (b *bisyncRun) fastCopy(ctx context.Context, fsrc, fdst fs.Fs, files bilib.
|
||||
}
|
||||
}
|
||||
|
||||
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
||||
if accounting.MaxCompletedTransfers != -1 {
|
||||
accounting.MaxCompletedTransfers = -1 // we need a complete list in the event of graceful shutdown
|
||||
}
|
||||
b.SyncCI = fs.GetConfig(ctxCopy) // allows us to request graceful shutdown
|
||||
accounting.Stats(ctxCopy).SetMaxCompletedTransfers(-1) // we need a complete list in the event of graceful shutdown
|
||||
ctxCopy, b.CancelSync = context.WithCancel(ctxCopy)
|
||||
b.testFn()
|
||||
err := sync.Sync(ctxCopy, fdst, fsrc, b.opt.CreateEmptySrcDirs)
|
||||
|
@@ -22,48 +22,52 @@ const (
|
||||
averageStopAfter = time.Minute
|
||||
)
|
||||
|
||||
// MaxCompletedTransfers specifies maximum number of completed transfers in startedTransfers list
|
||||
// MaxCompletedTransfers specifies the default maximum number of
|
||||
// completed transfers in startedTransfers list. This can be adjusted
|
||||
// for a given StatsInfo by calling the SetMaxCompletedTransfers
|
||||
// method.
|
||||
var MaxCompletedTransfers = 100
|
||||
|
||||
// StatsInfo accounts all transfers
|
||||
// N.B.: if this struct is modified, please remember to also update sum() function in stats_groups
|
||||
// to correctly count the updated fields
|
||||
type StatsInfo struct {
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
ci *fs.ConfigInfo
|
||||
bytes int64
|
||||
errors int64
|
||||
lastError error
|
||||
fatalError bool
|
||||
retryError bool
|
||||
retryAfter time.Time
|
||||
checks int64
|
||||
checking *transferMap
|
||||
checkQueue int
|
||||
checkQueueSize int64
|
||||
transfers int64
|
||||
transferring *transferMap
|
||||
transferQueue int
|
||||
transferQueueSize int64
|
||||
listed int64
|
||||
renames int64
|
||||
renameQueue int
|
||||
renameQueueSize int64
|
||||
deletes int64
|
||||
deletesSize int64
|
||||
deletedDirs int64
|
||||
inProgress *inProgress
|
||||
startedTransfers []*Transfer // currently active transfers
|
||||
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
||||
oldDuration time.Duration // duration of transfers we have culled
|
||||
group string
|
||||
startTime time.Time // the moment these stats were initialized or reset
|
||||
average averageValues
|
||||
serverSideCopies int64
|
||||
serverSideCopyBytes int64
|
||||
serverSideMoves int64
|
||||
serverSideMoveBytes int64
|
||||
mu sync.RWMutex
|
||||
ctx context.Context
|
||||
ci *fs.ConfigInfo
|
||||
bytes int64
|
||||
errors int64
|
||||
lastError error
|
||||
fatalError bool
|
||||
retryError bool
|
||||
retryAfter time.Time
|
||||
checks int64
|
||||
checking *transferMap
|
||||
checkQueue int
|
||||
checkQueueSize int64
|
||||
transfers int64
|
||||
transferring *transferMap
|
||||
transferQueue int
|
||||
transferQueueSize int64
|
||||
listed int64
|
||||
renames int64
|
||||
renameQueue int
|
||||
renameQueueSize int64
|
||||
deletes int64
|
||||
deletesSize int64
|
||||
deletedDirs int64
|
||||
inProgress *inProgress
|
||||
startedTransfers []*Transfer // currently active transfers
|
||||
oldTimeRanges timeRanges // a merged list of time ranges for the transfers
|
||||
oldDuration time.Duration // duration of transfers we have culled
|
||||
group string
|
||||
startTime time.Time // the moment these stats were initialized or reset
|
||||
average averageValues
|
||||
serverSideCopies int64
|
||||
serverSideCopyBytes int64
|
||||
serverSideMoves int64
|
||||
serverSideMoveBytes int64
|
||||
maxCompletedTransfers int
|
||||
}
|
||||
|
||||
type averageValues struct {
|
||||
@@ -81,17 +85,26 @@ type averageValues struct {
|
||||
func NewStats(ctx context.Context) *StatsInfo {
|
||||
ci := fs.GetConfig(ctx)
|
||||
s := &StatsInfo{
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
checking: newTransferMap(ci.Checkers, "checking"),
|
||||
transferring: newTransferMap(ci.Transfers, "transferring"),
|
||||
inProgress: newInProgress(ctx),
|
||||
startTime: time.Now(),
|
||||
average: averageValues{},
|
||||
ctx: ctx,
|
||||
ci: ci,
|
||||
checking: newTransferMap(ci.Checkers, "checking"),
|
||||
transferring: newTransferMap(ci.Transfers, "transferring"),
|
||||
inProgress: newInProgress(ctx),
|
||||
startTime: time.Now(),
|
||||
average: averageValues{},
|
||||
maxCompletedTransfers: MaxCompletedTransfers,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// SetMaxCompletedTransfers sets the maximum number of completed transfers to keep.
|
||||
func (s *StatsInfo) SetMaxCompletedTransfers(n int) *StatsInfo {
|
||||
s.mu.Lock()
|
||||
s.maxCompletedTransfers = n
|
||||
s.mu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
// RemoteStats returns stats for rc
|
||||
//
|
||||
// If short is true then the transfers and checkers won't be added.
|
||||
@@ -914,13 +927,14 @@ func (s *StatsInfo) RemoveTransfer(transfer *Transfer) {
|
||||
// PruneTransfers makes sure there aren't too many old transfers by removing
|
||||
// a single finished transfer. Returns true if it removed a transfer.
|
||||
func (s *StatsInfo) PruneTransfers() bool {
|
||||
if MaxCompletedTransfers < 0 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.maxCompletedTransfers < 0 {
|
||||
return false
|
||||
}
|
||||
removed := false
|
||||
s.mu.Lock()
|
||||
// remove a transfer from the start if we are over quota
|
||||
if len(s.startedTransfers) > MaxCompletedTransfers+s.ci.Transfers {
|
||||
if len(s.startedTransfers) > s.maxCompletedTransfers+s.ci.Transfers {
|
||||
for i, tr := range s.startedTransfers {
|
||||
if tr.IsDone() {
|
||||
s._removeTransfer(tr, i)
|
||||
@@ -929,7 +943,6 @@ func (s *StatsInfo) PruneTransfers() bool {
|
||||
}
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return removed
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user