mirror of
https://github.com/rclone/rclone.git
synced 2025-02-14 21:23:01 +02:00
Implement --max-transfer flag to quit transferring at a limit #1655
This commit is contained in:
parent
d178233e74
commit
5c128272fd
@ -536,6 +536,13 @@ files not recursed through are considered excluded and will be deleted
|
|||||||
on the destination. Test first with `--dry-run` if you are not sure
|
on the destination. Test first with `--dry-run` if you are not sure
|
||||||
what will happen.
|
what will happen.
|
||||||
|
|
||||||
|
### --max-transfer=SIZE ###
|
||||||
|
|
||||||
|
Rclone will stop transferring when it has reached the size specified.
|
||||||
|
Defaults to off.
|
||||||
|
|
||||||
|
When the limit is reached all transfers will stop immediately.
|
||||||
|
|
||||||
### --modify-window=TIME ###
|
### --modify-window=TIME ###
|
||||||
|
|
||||||
When checking whether a file has been modified, this is the maximum
|
When checking whether a file has been modified, this is the maximum
|
||||||
|
@ -10,8 +10,14 @@ import (
|
|||||||
"github.com/VividCortex/ewma"
|
"github.com/VividCortex/ewma"
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
"github.com/ncw/rclone/fs/asyncreader"
|
"github.com/ncw/rclone/fs/asyncreader"
|
||||||
|
"github.com/ncw/rclone/fs/fserrors"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrorMaxTransferLimitReached is returned from Read when the max
|
||||||
|
// transfer limit is reached.
|
||||||
|
var ErrorMaxTransferLimitReached = fserrors.FatalError(errors.New("Max transfer limit reached as set by --max-transfer"))
|
||||||
|
|
||||||
// Account limits and accounts for one transfer
|
// Account limits and accounts for one transfer
|
||||||
type Account struct {
|
type Account struct {
|
||||||
// The mutex is to make sure Read() and Close() aren't called
|
// The mutex is to make sure Read() and Close() aren't called
|
||||||
@ -27,6 +33,7 @@ type Account struct {
|
|||||||
name string
|
name string
|
||||||
statmu sync.Mutex // Separate mutex for stat values.
|
statmu sync.Mutex // Separate mutex for stat values.
|
||||||
bytes int64 // Total number of bytes read
|
bytes int64 // Total number of bytes read
|
||||||
|
max int64 // if >=0 the max number of bytes to transfer
|
||||||
start time.Time // Start time of first read
|
start time.Time // Start time of first read
|
||||||
lpTime time.Time // Time of last average measurement
|
lpTime time.Time // Time of last average measurement
|
||||||
lpBytes int // Number of bytes read since last measurement
|
lpBytes int // Number of bytes read since last measurement
|
||||||
@ -48,6 +55,7 @@ func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
|
|||||||
exit: make(chan struct{}),
|
exit: make(chan struct{}),
|
||||||
avg: ewma.NewMovingAverage(),
|
avg: ewma.NewMovingAverage(),
|
||||||
lpTime: time.Now(),
|
lpTime: time.Now(),
|
||||||
|
max: int64(fs.Config.MaxTransfer),
|
||||||
}
|
}
|
||||||
go acc.averageLoop()
|
go acc.averageLoop()
|
||||||
Stats.inProgress.set(acc.name, acc)
|
Stats.inProgress.set(acc.name, acc)
|
||||||
@ -131,8 +139,12 @@ func (acc *Account) averageLoop() {
|
|||||||
|
|
||||||
// read bytes from the io.Reader passed in and account them
|
// read bytes from the io.Reader passed in and account them
|
||||||
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
|
func (acc *Account) read(in io.Reader, p []byte) (n int, err error) {
|
||||||
// Set start time.
|
|
||||||
acc.statmu.Lock()
|
acc.statmu.Lock()
|
||||||
|
if acc.max >= 0 && Stats.GetBytes() >= acc.max {
|
||||||
|
acc.statmu.Unlock()
|
||||||
|
return 0, ErrorMaxTransferLimitReached
|
||||||
|
}
|
||||||
|
// Set start time.
|
||||||
if acc.start.IsZero() {
|
if acc.start.IsZero() {
|
||||||
acc.start = time.Now()
|
acc.start = time.Now()
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
"github.com/ncw/rclone/fs/asyncreader"
|
"github.com/ncw/rclone/fs/asyncreader"
|
||||||
|
"github.com/ncw/rclone/fs/fserrors"
|
||||||
"github.com/ncw/rclone/fstest/mockobject"
|
"github.com/ncw/rclone/fstest/mockobject"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -181,3 +183,28 @@ func TestAccountAccounter(t *testing.T) {
|
|||||||
assert.True(t, wrap(in3) == in3)
|
assert.True(t, wrap(in3) == in3)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAccountMaxTransfer(t *testing.T) {
|
||||||
|
old := fs.Config.MaxTransfer
|
||||||
|
fs.Config.MaxTransfer = 15
|
||||||
|
defer func() {
|
||||||
|
fs.Config.MaxTransfer = old
|
||||||
|
}()
|
||||||
|
Stats.ResetCounters()
|
||||||
|
|
||||||
|
in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100)))
|
||||||
|
acc := NewAccountSizeName(in, 1, "test")
|
||||||
|
|
||||||
|
var b = make([]byte, 10)
|
||||||
|
|
||||||
|
n, err := acc.Read(b)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err = acc.Read(b)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
n, err = acc.Read(b)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Equal(t, ErrorMaxTransferLimitReached, err)
|
||||||
|
assert.True(t, fserrors.IsFatalError(err))
|
||||||
|
}
|
||||||
|
@ -95,6 +95,13 @@ func (s *StatsInfo) Bytes(bytes int64) {
|
|||||||
s.bytes += bytes
|
s.bytes += bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetBytes returns the number of bytes transferred so far
|
||||||
|
func (s *StatsInfo) GetBytes() int64 {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
return s.bytes
|
||||||
|
}
|
||||||
|
|
||||||
// Errors updates the stats for errors
|
// Errors updates the stats for errors
|
||||||
func (s *StatsInfo) Errors(errors int64) {
|
func (s *StatsInfo) Errors(errors int64) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
|
@ -71,6 +71,7 @@ type ConfigInfo struct {
|
|||||||
StatsFileNameLength int
|
StatsFileNameLength int
|
||||||
AskPassword bool
|
AskPassword bool
|
||||||
UseServerModTime bool
|
UseServerModTime bool
|
||||||
|
MaxTransfer SizeSuffix
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfig creates a new config with everything set to the default
|
// NewConfig creates a new config with everything set to the default
|
||||||
@ -98,6 +99,7 @@ func NewConfig() *ConfigInfo {
|
|||||||
c.StatsFileNameLength = 40
|
c.StatsFileNameLength = 40
|
||||||
c.AskPassword = true
|
c.AskPassword = true
|
||||||
c.TPSLimitBurst = 1
|
c.TPSLimitBurst = 1
|
||||||
|
c.MaxTransfer = -1
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
|
|||||||
flags.FVarP(flagSet, &fs.Config.BufferSize, "buffer-size", "", "Buffer size when copying files.")
|
flags.FVarP(flagSet, &fs.Config.BufferSize, "buffer-size", "", "Buffer size when copying files.")
|
||||||
flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.")
|
flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.")
|
||||||
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
|
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
|
||||||
|
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetFlags converts any flags into config which weren't straight foward
|
// SetFlags converts any flags into config which weren't straight foward
|
||||||
|
Loading…
x
Reference in New Issue
Block a user