From d4213c0ac5aa27dccb16b983b40c094176a0d552 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 24 May 2018 15:03:57 +0100 Subject: [PATCH] sftp: Fix slow downloads for long latency connections - fixes #1158 This was caused by using the sftp.File.Read method which resets the streaming window after each call. Replacing it with sftp.File.WriteTo and an io.Pipe fixes the problem bringing the speed to the same as the sftp binary. --- backend/sftp/sftp.go | 51 +++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 1fe1553f7..176e24dd9 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -125,12 +125,6 @@ type Object struct { sha1sum *string // Cached SHA1 checksum } -// ObjectReader holds the sftp.File interface to a remote SFTP file opened for reading -type ObjectReader struct { - object *Object - sftpFile *sftp.File -} - // readCurrentUser finds the current user name or "" if not found func readCurrentUser() (userName string) { usr, err := user.Current() @@ -877,15 +871,49 @@ func (o *Object) Storable() bool { return o.mode.IsRegular() } +// objectReader represents a file open for reading on the SFTP server +type objectReader struct { + sftpFile *sftp.File + pipeReader *io.PipeReader + done chan struct{} +} + +func newObjectReader(sftpFile *sftp.File) *objectReader { + pipeReader, pipeWriter := io.Pipe() + file := &objectReader{ + sftpFile: sftpFile, + pipeReader: pipeReader, + done: make(chan struct{}), + } + + go func() { + // Use sftpFile.WriteTo to pump data so that it gets a + // chance to build the window up. + _, err := sftpFile.WriteTo(pipeWriter) + // Close the pipeWriter so the pipeReader fails with + // the same error or EOF if err == nil + _ = pipeWriter.CloseWithError(err) + // signal that we've finished + close(file.done) + }() + + return file +} + // Read from a remote sftp file object reader -func (file *ObjectReader) Read(p []byte) (n int, err error) { - n, err = file.sftpFile.Read(p) +func (file *objectReader) Read(p []byte) (n int, err error) { + n, err = file.pipeReader.Read(p) return n, err } // Close a reader of a remote sftp file -func (file *ObjectReader) Close() (err error) { +func (file *objectReader) Close() (err error) { + // Close the sftpFile - this will likely cause the WriteTo to error err = file.sftpFile.Close() + // Close the pipeReader so writes to the pipeWriter fail + _ = file.pipeReader.Close() + // Wait for the background process to finish + <-file.done return err } @@ -919,10 +947,7 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { return nil, errors.Wrap(err, "Open Seek failed") } } - in = readers.NewLimitedReadCloser(&ObjectReader{ - object: o, - sftpFile: sftpFile, - }, limit) + in = readers.NewLimitedReadCloser(newObjectReader(sftpFile), limit) return in, nil }