mirror of
https://github.com/rclone/rclone.git
synced 2025-08-10 06:09:44 +02:00
march: fix deadlock when using --no-traverse - fixes #8656
This ocurred whenever there were more than 100 files in the source due to the output channel filling up. The fix is not to use list.NewSorter but take more care to output the dst objects in the same order the src objects are delivered. As the src objects are delivered sorted, no sorting is needed. In order not to cause another deadlock, we need to send nil dst objects which is safe since this adjusts the termination conditions for the channels. Thanks to @jeremy for the test script the Go tests are based on.
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/rclone/rclone/fs/list"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
"github.com/rclone/rclone/lib/transform"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/text/unicode/norm"
|
||||
)
|
||||
|
||||
@@ -291,6 +290,7 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO
|
||||
srcPrev, dstPrev fs.DirEntry
|
||||
srcPrevName, dstPrevName string
|
||||
src, dst fs.DirEntry
|
||||
srcHasMore, dstHasMore = true, true
|
||||
srcName, dstName string
|
||||
)
|
||||
srcDone := func() {
|
||||
@@ -311,14 +311,14 @@ func (m *March) matchListings(srcChan, dstChan <-chan fs.DirEntry, srcOnly, dstO
|
||||
}
|
||||
// Reload src and dst if needed - we set them to nil if used
|
||||
if src == nil {
|
||||
src = <-srcChan
|
||||
src, srcHasMore = <-srcChan
|
||||
srcName = m.srcKey(src)
|
||||
}
|
||||
if dst == nil {
|
||||
dst = <-dstChan
|
||||
dst, dstHasMore = <-dstChan
|
||||
dstName = m.dstKey(dst)
|
||||
}
|
||||
if src == nil && dst == nil {
|
||||
if !srcHasMore && !dstHasMore {
|
||||
break
|
||||
}
|
||||
if src != nil && srcPrev != nil {
|
||||
@@ -419,38 +419,65 @@ func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
|
||||
// If NoTraverse is set, then try to find a matching object
|
||||
// for each item in the srcList to head dst object
|
||||
if m.NoTraverse && !m.NoCheckDest {
|
||||
startedDst = true
|
||||
workers := ci.Checkers
|
||||
originalSrcChan := srcChan
|
||||
srcChan = make(chan fs.DirEntry, 100)
|
||||
ls, err := list.NewSorter(m.Ctx, m.Fdst, list.SortToChan(dstChan), m.dstKey)
|
||||
|
||||
type matchTask struct {
|
||||
src fs.DirEntry // src object to find in destination
|
||||
dstMatch chan<- fs.DirEntry // channel to receive matching dst object or nil
|
||||
}
|
||||
matchTasks := make(chan matchTask, workers)
|
||||
dstMatches := make(chan (<-chan fs.DirEntry), workers)
|
||||
|
||||
// Create the tasks from the originalSrcChan. These are put into matchTasks for
|
||||
// processing and dstMatches so they can be retrieved in order.
|
||||
go func() {
|
||||
for src := range originalSrcChan {
|
||||
srcChan <- src
|
||||
dstMatch := make(chan fs.DirEntry, 1)
|
||||
matchTasks <- matchTask{
|
||||
src: src,
|
||||
dstMatch: dstMatch,
|
||||
}
|
||||
dstMatches <- dstMatch
|
||||
}
|
||||
close(matchTasks)
|
||||
}()
|
||||
|
||||
// Get the tasks from the queue and find a matching object.
|
||||
var workerWg sync.WaitGroup
|
||||
for range workers {
|
||||
workerWg.Add(1)
|
||||
go func() {
|
||||
defer workerWg.Done()
|
||||
for t := range matchTasks {
|
||||
leaf := path.Base(t.src.Remote())
|
||||
dst, err := m.Fdst.NewObject(m.Ctx, path.Join(job.dstRemote, leaf))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
dst = nil
|
||||
}
|
||||
t.dstMatch <- dst
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
startedDst = true
|
||||
// Close dstResults when all the workers have finished
|
||||
go func() {
|
||||
workerWg.Wait()
|
||||
close(dstMatches)
|
||||
}()
|
||||
|
||||
// Read the matches in order and send them to dstChan if found.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer ls.CleanUp()
|
||||
|
||||
g, gCtx := errgroup.WithContext(m.Ctx)
|
||||
g.SetLimit(ci.Checkers)
|
||||
for src := range originalSrcChan {
|
||||
srcChan <- src
|
||||
if srcObj, ok := src.(fs.Object); ok {
|
||||
g.Go(func() error {
|
||||
leaf := path.Base(srcObj.Remote())
|
||||
dstObj, err := m.Fdst.NewObject(gCtx, path.Join(job.dstRemote, leaf))
|
||||
if err == nil {
|
||||
_ = ls.Add(fs.DirEntries{dstObj}) // ignore errors
|
||||
}
|
||||
return nil // ignore errors
|
||||
})
|
||||
}
|
||||
}
|
||||
dstListErr = g.Wait()
|
||||
sendErr := ls.Send()
|
||||
if dstListErr == nil {
|
||||
dstListErr = sendErr
|
||||
for dstMatch := range dstMatches {
|
||||
dst := <-dstMatch
|
||||
// Note that dst may be nil here
|
||||
// We send these on so we don't deadlock the reader
|
||||
dstChan <- dst
|
||||
}
|
||||
close(srcChan)
|
||||
close(dstChan)
|
||||
|
@@ -216,6 +216,35 @@ func TestCopyNoTraverse(t *testing.T) {
|
||||
r.CheckRemoteItems(t, file1)
|
||||
}
|
||||
|
||||
func TestCopyNoTraverseDeadlock(t *testing.T) {
|
||||
r := fstest.NewRun(t)
|
||||
if !r.Fremote.Features().IsLocal {
|
||||
t.Skip("Only runs on local")
|
||||
}
|
||||
const nFiles = 200
|
||||
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
|
||||
|
||||
// Create lots of source files.
|
||||
items := make([]fstest.Item, nFiles)
|
||||
for i := range items {
|
||||
name := fmt.Sprintf("file%d.txt", i)
|
||||
items[i] = r.WriteFile(name, fmt.Sprintf("content%d", i), t1)
|
||||
}
|
||||
r.CheckLocalItems(t, items...)
|
||||
|
||||
// Set --no-traverse
|
||||
ctx, ci := fs.AddConfig(context.Background())
|
||||
ci.NoTraverse = true
|
||||
|
||||
// Initial copy to establish destination.
|
||||
require.NoError(t, CopyDir(ctx, r.Fremote, r.Flocal, false))
|
||||
r.CheckRemoteItems(t, items...)
|
||||
|
||||
// Second copy which shouldn't deadlock
|
||||
require.NoError(t, CopyDir(ctx, r.Flocal, r.Fremote, false))
|
||||
r.CheckRemoteItems(t, items...)
|
||||
}
|
||||
|
||||
// Now with --check-first
|
||||
func TestCopyCheckFirst(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
Reference in New Issue
Block a user