1
0
mirror of https://github.com/rclone/rclone.git synced 2025-01-13 20:38:12 +02:00
rclone/cache/handle.go
2017-11-15 15:23:21 +00:00

437 lines
11 KiB
Go

// +build !plan9
package cache
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
// Handle is managing the read/write/seek operations on an open handle
type Handle struct {
cachedObject *Object
memory ChunkStorage
preloadQueue chan int64
preloadOffset int64
offset int64
seenOffsets map[int64]bool
mu sync.Mutex
ReadRetries int
TotalWorkers int
UseMemory bool
workers []*worker
chunkAge time.Duration
warmup bool
closed bool
}
// NewObjectHandle returns a new Handle for an existing Object
func NewObjectHandle(o *Object) *Handle {
r := &Handle{
cachedObject: o,
offset: 0,
preloadOffset: -1, // -1 to trigger the first preload
ReadRetries: o.CacheFs.readRetries,
TotalWorkers: o.CacheFs.totalWorkers,
UseMemory: o.CacheFs.chunkMemory,
chunkAge: o.CacheFs.chunkAge,
warmup: o.CacheFs.InWarmUp(),
}
r.seenOffsets = make(map[int64]bool)
r.memory = NewMemory(o.CacheFs.chunkAge)
if o.CacheFs.InWarmUp() {
r.chunkAge = o.CacheFs.metaAge
}
// create a larger buffer to queue up requests
r.preloadQueue = make(chan int64, r.TotalWorkers*10)
r.startReadWorkers()
return r
}
// cacheFs is a convenience method to get the parent cache FS of the object's manager
func (r *Handle) cacheFs() *Fs {
return r.cachedObject.CacheFs
}
// storage is a convenience method to get the persistent storage of the object's manager
func (r *Handle) storage() Storage {
return r.cacheFs().cache
}
// String representation of this reader
func (r *Handle) String() string {
return r.cachedObject.abs()
}
// InWarmUp says if this handle is in warmup mode
func (r *Handle) InWarmUp() bool {
return r.warmup
}
// startReadWorkers will start the worker pool
func (r *Handle) startReadWorkers() {
if r.hasAtLeastOneWorker() {
return
}
for i := 0; i < r.TotalWorkers; i++ {
w := &worker{
r: r,
ch: r.preloadQueue,
id: i,
}
go w.run()
r.workers = append(r.workers, w)
}
}
// queueOffset will send an offset to the workers if it's different from the last one
func (r *Handle) queueOffset(offset int64) {
if offset != r.preloadOffset {
r.preloadOffset = offset
previousChunksCounter := 0
maxOffset := r.cacheFs().chunkSize * int64(r.cacheFs().originalSettingWorkers())
// clear the past seen chunks
// they will remain in our persistent storage but will be removed from transient
// so they need to be picked up by a worker
for k := range r.seenOffsets {
if k < offset {
r.seenOffsets[k] = false
// we count how many continuous chunks were seen before
if offset >= maxOffset && k >= offset-maxOffset {
previousChunksCounter++
}
}
}
// if we read all the previous chunks that could have been preloaded
// we should then disable warm up setting for this handle
if r.warmup && previousChunksCounter >= r.cacheFs().originalSettingWorkers() {
r.TotalWorkers = r.cacheFs().originalSettingWorkers()
r.UseMemory = !r.cacheFs().originalSettingChunkNoMemory()
r.chunkAge = r.cacheFs().chunkAge
r.warmup = false
fs.Infof(r, "disabling warm up")
}
for i := 0; i < r.TotalWorkers; i++ {
o := r.preloadOffset + r.cacheFs().chunkSize*int64(i)
if o < 0 || o >= r.cachedObject.Size() {
continue
}
if v, ok := r.seenOffsets[o]; ok && v {
continue
}
r.seenOffsets[o] = true
r.preloadQueue <- o
}
}
}
func (r *Handle) hasAtLeastOneWorker() bool {
oneWorker := false
for i := 0; i < len(r.workers); i++ {
if r.workers[i].isRunning() {
oneWorker = true
}
}
return oneWorker
}
// getChunk is called by the FS to retrieve a specific chunk of known start and size from where it can find it
// it can be from transient or persistent cache
// it will also build the chunk from the cache's specific chunk boundaries and build the final desired chunk in a buffer
func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
var data []byte
var err error
// we reached the end of the file
if chunkStart >= r.cachedObject.Size() {
fs.Debugf(r, "reached EOF %v", chunkStart)
return nil, io.EOF
}
// we calculate the modulus of the requested offset with the size of a chunk
offset := chunkStart % r.cacheFs().chunkSize
// we align the start offset of the first chunk to a likely chunk in the storage
chunkStart = chunkStart - offset
r.queueOffset(chunkStart)
found := false
// delete old chunks from memory
if r.UseMemory {
go r.memory.CleanChunksByNeed(chunkStart)
data, err = r.memory.GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
}
}
if !found {
// we're gonna give the workers a chance to pickup the chunk
// and retry a couple of times
for i := 0; i < r.ReadRetries; i++ {
data, err = r.storage().GetChunk(r.cachedObject, chunkStart)
if err == nil {
found = true
break
}
fs.Debugf(r, "%v: chunk retry storage: %v", chunkStart, i)
time.Sleep(time.Second)
}
}
// not found in ram or
// the worker didn't managed to download the chunk in time so we abort and close the stream
if err != nil || len(data) == 0 || !found {
if !r.hasAtLeastOneWorker() {
fs.Errorf(r, "out of workers")
return nil, io.ErrUnexpectedEOF
}
return nil, errors.Errorf("chunk not found %v", chunkStart)
}
// first chunk will be aligned with the start
if offset > 0 {
data = data[int(offset):]
}
return data, nil
}
// Read a chunk from storage or len(p)
func (r *Handle) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
var buf []byte
currentOffset := r.offset
buf, err = r.getChunk(currentOffset)
if err != nil && len(buf) == 0 {
return 0, io.EOF
}
readSize := copy(p, buf)
newOffset := currentOffset + int64(readSize)
r.offset = newOffset
if r.offset >= r.cachedObject.Size() {
err = io.EOF
}
return readSize, err
}
// Close will tell the workers to stop
func (r *Handle) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.closed {
return errors.New("file already closed")
}
close(r.preloadQueue)
r.closed = true
// wait for workers to complete their jobs before returning
waitCount := 3
for i := 0; i < len(r.workers); i++ {
waitIdx := 0
for r.workers[i].isRunning() && waitIdx < waitCount {
time.Sleep(time.Second)
waitIdx++
}
}
fs.Debugf(r, "cache reader closed %v", r.offset)
return nil
}
// Seek will move the current offset based on whence and instruct the workers to move there too
func (r *Handle) Seek(offset int64, whence int) (int64, error) {
r.mu.Lock()
defer r.mu.Unlock()
var err error
switch whence {
case os.SEEK_SET:
fs.Debugf(r, "moving offset set from %v to %v", r.offset, offset)
r.offset = offset
case os.SEEK_CUR:
fs.Debugf(r, "moving offset cur from %v to %v", r.offset, r.offset+offset)
r.offset += offset
case os.SEEK_END:
fs.Debugf(r, "moving offset end (%v) from %v to %v", r.cachedObject.Size(), r.offset, r.cachedObject.Size()+offset)
r.offset = r.cachedObject.Size() + offset
default:
err = errors.Errorf("cache: unimplemented seek whence %v", whence)
}
chunkStart := r.offset - (r.offset % r.cacheFs().chunkSize)
if chunkStart >= r.cacheFs().chunkSize {
chunkStart = chunkStart - r.cacheFs().chunkSize
}
r.queueOffset(chunkStart)
return r.offset, err
}
type worker struct {
r *Handle
ch <-chan int64
rc io.ReadCloser
id int
running bool
mu sync.Mutex
}
// String is a representation of this worker
func (w *worker) String() string {
return fmt.Sprintf("worker-%v <%v>", w.id, w.r.cachedObject.Name)
}
// reader will return a reader depending on the capabilities of the source reader:
// - if it supports seeking it will seek to the desired offset and return the same reader
// - if it doesn't support seeking it will close a possible existing one and open at the desired offset
// - if there's no reader associated with this worker, it will create one
func (w *worker) reader(offset, end int64) (io.ReadCloser, error) {
var err error
r := w.rc
if w.rc == nil {
r, err = w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
return w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset})
})
if err != nil {
return nil, err
}
return r, nil
}
seekerObj, ok := r.(io.Seeker)
if ok {
_, err = seekerObj.Seek(offset, os.SEEK_SET)
return r, err
}
_ = w.rc.Close()
return w.r.cacheFs().OpenRateLimited(func() (io.ReadCloser, error) {
r, err = w.r.cachedObject.Object.Open(&fs.RangeOption{Start: offset, End: end}, &fs.SeekOption{Offset: offset})
if err != nil {
return nil, err
}
return r, nil
})
}
func (w *worker) isRunning() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.running
}
func (w *worker) setRunning(f bool) {
w.mu.Lock()
defer w.mu.Unlock()
w.running = f
}
// run is the main loop for the worker which receives offsets to preload
func (w *worker) run() {
var err error
var data []byte
defer w.setRunning(false)
defer func() {
if w.rc != nil {
_ = w.rc.Close()
w.setRunning(false)
}
}()
for {
chunkStart, open := <-w.ch
w.setRunning(true)
if chunkStart < 0 || !open {
break
}
// skip if it exists
if w.r.UseMemory {
if w.r.memory.HasChunk(w.r.cachedObject, chunkStart) {
continue
}
// add it in ram if it's in the persistent storage
data, err = w.r.storage().GetChunk(w.r.cachedObject, chunkStart)
if err == nil {
err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
} else {
continue
}
}
err = nil
} else {
if w.r.storage().HasChunk(w.r.cachedObject, chunkStart) {
continue
}
}
chunkEnd := chunkStart + w.r.cacheFs().chunkSize
if chunkEnd > w.r.cachedObject.Size() {
chunkEnd = w.r.cachedObject.Size()
}
w.rc, err = w.reader(chunkStart, chunkEnd)
// we seem to be getting only errors so we abort
if err != nil {
fs.Errorf(w, "object open failed %v: %v", chunkStart, err)
return
}
data = make([]byte, chunkEnd-chunkStart)
sourceRead := 0
sourceRead, err = io.ReadFull(w.rc, data)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
fs.Errorf(w, "failed to read chunk %v: %v", chunkStart, err)
return
}
if err == io.ErrUnexpectedEOF {
fs.Debugf(w, "partial read chunk %v: %v", chunkStart, err)
}
data = data[:sourceRead] // reslice to remove extra garbage
fs.Debugf(w, "downloaded chunk %v", fs.SizeSuffix(chunkStart))
if w.r.UseMemory {
err = w.r.memory.AddChunk(w.r.cachedObject, data, chunkStart)
if err != nil {
fs.Errorf(w, "failed caching chunk in ram %v: %v", chunkStart, err)
}
}
err = w.r.storage().AddChunkAhead(w.r.cachedObject.abs(), data, chunkStart, w.r.chunkAge)
if err != nil {
fs.Errorf(w, "failed caching chunk in storage %v: %v", chunkStart, err)
}
}
}
// Check the interfaces are satisfied
var (
_ io.ReadCloser = (*Handle)(nil)
_ io.Seeker = (*Handle)(nil)
)