1
0
mirror of https://github.com/rclone/rclone.git synced 2025-12-04 13:49:49 +02:00
Files
rclone/backend/pikpak/multipart.go

334 lines
9.0 KiB
Go
Raw Normal View History

package pikpak
import (
"context"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/rclone/rclone/backend/pikpak/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"golang.org/x/sync/errgroup"
)
const (
bufferSize = 1024 * 1024 // default size of the pages used in the reader
bufferCacheSize = 64 // max number of buffers to keep in cache
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
)
// bufferPool is a global pool of buffers
var (
bufferPool *pool.Pool
bufferPoolOnce sync.Once
)
// get a buffer pool
func getPool() *pool.Pool {
bufferPoolOnce.Do(func() {
ci := fs.GetConfig(context.Background())
// Initialise the buffer pool when used
bufferPool = pool.New(bufferCacheFlushTime, bufferSize, bufferCacheSize, ci.UseMmap)
})
return bufferPool
}
// NewRW gets a pool.RW using the multipart pool
func NewRW() *pool.RW {
return pool.NewRW(getPool())
}
// Upload does a multipart upload in parallel
func (w *pikpakChunkWriter) Upload(ctx context.Context) (err error) {
// make concurrency machinery
tokens := pacer.NewTokenDispenser(w.con)
uploadCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer atexit.OnError(&err, func() {
cancel()
fs.Debugf(w.o, "multipart upload: Cancelling...")
errCancel := w.Abort(ctx)
if errCancel != nil {
fs.Debugf(w.o, "multipart upload: failed to cancel: %v", errCancel)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
off int64
size = w.size
chunkSize = w.chunkSize
)
// Do the accounting manually
in, acc := accounting.UnWrapAccounting(w.in)
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
rw := NewRW()
if acc != nil {
rw.SetAccounting(acc.AccountRead)
}
free := func() {
// return the memory and token
_ = rw.Close() // Can't return an error
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int64
n, err = io.CopyN(rw, in, chunkSize)
if err == io.EOF {
if n == 0 && partNum != 0 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return fmt.Errorf("multipart upload: failed to read source: %w", err)
}
partNum := partNum
partOff := off
off += n
g.Go(func() (err error) {
defer free()
fs.Debugf(w.o, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size))
_, err = w.WriteChunk(gCtx, int32(partNum), rw)
return err
})
}
err = g.Wait()
if err != nil {
return err
}
err = w.Close(ctx)
if err != nil {
return fmt.Errorf("multipart upload: failed to finalise: %w", err)
}
return nil
}
var warnStreamUpload sync.Once
// state of ChunkWriter
type pikpakChunkWriter struct {
chunkSize int64
size int64
con int
f *Fs
o *Object
in io.Reader
mu sync.Mutex
completedParts []types.CompletedPart
client *s3.Client
mOut *s3.CreateMultipartUploadOutput
}
func (f *Fs) newChunkWriter(ctx context.Context, remote string, size int64, p *api.ResumableParams, in io.Reader, options ...fs.OpenOption) (w *pikpakChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts)))
})
} else {
chunkSize = chunksize.Calculator(o, size, maxUploadParts, chunkSize)
}
client, err := f.newS3Client(ctx, p)
if err != nil {
return nil, fmt.Errorf("failed to create upload client: %w", err)
}
w = &pikpakChunkWriter{
chunkSize: int64(chunkSize),
size: size,
con: max(1, f.opt.UploadConcurrency),
f: f,
o: o,
in: in,
completedParts: make([]types.CompletedPart, 0),
client: client,
}
req := &s3.CreateMultipartUploadInput{
Bucket: &p.Bucket,
Key: &p.Key,
}
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
req.CacheControl = aws.String(value)
case "content-disposition":
req.ContentDisposition = aws.String(value)
case "content-encoding":
req.ContentEncoding = aws.String(value)
case "content-type":
req.ContentType = aws.String(value)
}
}
err = w.f.pacer.Call(func() (bool, error) {
w.mOut, err = w.client.CreateMultipartUpload(ctx, req)
return w.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("create multipart upload failed: %w", err)
}
fs.Debugf(w.o, "multipart upload: %q initiated", *w.mOut.UploadId)
return
}
// shouldRetry returns a boolean as to whether this err
// deserve to be retried. It returns the err as a convenience
func (w *pikpakChunkWriter) shouldRetry(ctx context.Context, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
if fserrors.ShouldRetry(err) {
return true, err
}
return false, err
}
// add a part number and etag to the completed parts
func (w *pikpakChunkWriter) addCompletedPart(part types.CompletedPart) {
w.mu.Lock()
defer w.mu.Unlock()
w.completedParts = append(w.completedParts, part)
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (w *pikpakChunkWriter) WriteChunk(ctx context.Context, chunkNumber int32, reader io.ReadSeeker) (currentChunkSize int64, err error) {
if chunkNumber < 0 {
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
return -1, err
}
partNumber := chunkNumber + 1
var res *s3.UploadPartOutput
err = w.f.pacer.Call(func() (bool, error) {
// Discover the size by seeking to the end
currentChunkSize, err = reader.Seek(0, io.SeekEnd)
if err != nil {
return false, err
}
// rewind the reader on retry and after reading md5
_, err := reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
res, err = w.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
PartNumber: &partNumber,
Body: reader,
})
if err != nil {
if chunkNumber <= 8 {
return w.shouldRetry(ctx, err)
}
// retry all chunks once have done the first few
return true, err
}
return false, nil
})
if err != nil {
return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", partNumber, currentChunkSize, err)
}
w.addCompletedPart(types.CompletedPart{
PartNumber: &partNumber,
ETag: res.ETag,
})
fs.Debugf(w.o, "multipart upload: wrote chunk %d with %v bytes", partNumber, currentChunkSize)
return currentChunkSize, err
}
// Abort the multipart upload
func (w *pikpakChunkWriter) Abort(ctx context.Context) (err error) {
// Abort the upload session
err = w.f.pacer.Call(func() (bool, error) {
_, err = w.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
})
return w.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to abort multipart upload %q: %w", *w.mOut.UploadId, err)
}
fs.Debugf(w.o, "multipart upload: %q aborted", *w.mOut.UploadId)
return
}
// Close and finalise the multipart upload
func (w *pikpakChunkWriter) Close(ctx context.Context) (err error) {
// sort the completed parts by part number
sort.Slice(w.completedParts, func(i, j int) bool {
return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber
})
// Finalise the upload session
err = w.f.pacer.Call(func() (bool, error) {
_, err = w.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: w.mOut.Bucket,
Key: w.mOut.Key,
UploadId: w.mOut.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: w.completedParts,
},
})
return w.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
fs.Debugf(w.o, "multipart upload: %q finished", *w.mOut.UploadId)
return
}