1
0
mirror of https://github.com/rclone/rclone.git synced 2025-01-13 20:38:12 +02:00

azure-blob-storage: utilize streaming capabilities - #1614

This commit is contained in:
Denis Neuling 2019-09-10 17:35:25 +02:00 committed by Nick Craig-Wood
parent a7689d7023
commit ec73d2fb9a
3 changed files with 11 additions and 234 deletions

View File

@ -5,9 +5,7 @@
package azureblob
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
@ -26,7 +24,6 @@ import (
"github.com/Azure/go-autorest/autorest/adal"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
@ -39,8 +36,6 @@ import (
"github.com/rclone/rclone/lib/env"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup"
)
const (
@ -51,15 +46,11 @@ const (
modTimeKey = "mtime"
timeFormatIn = time.RFC3339
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
maxTotalParts = 50000 // in multipart upload
storageDefaultBaseURL = "blob.core.windows.net"
// maxUncommittedSize = 9 << 30 // can't upload bigger than this
defaultChunkSize = 4 * fs.MebiByte
maxChunkSize = 100 * fs.MebiByte
defaultUploadCutoff = 256 * fs.MebiByte
maxUploadCutoff = 256 * fs.MebiByte
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
defaultChunkSize = 4 * fs.MebiByte
maxChunkSize = 100 * fs.MebiByte
defaultAccessTier = azblob.AccessTierNone
maxTryTimeout = time.Hour * 24 * 365 //max time of an azure web request response window (whether or not data is flowing)
// Default storage account, key and blob endpoint for emulator support,
// though it is a base64 key checked in here, it is publicly available secret.
emulatorAccount = "devstoreaccount1"
@ -137,8 +128,7 @@ msi_client_id, or msi_mi_res_id parameters.`,
Advanced: true,
}, {
Name: "upload_cutoff",
Help: "Cutoff for switching to chunked upload (<= 256MB).",
Default: defaultUploadCutoff,
Help: "Cutoff for switching to chunked upload (<= 256MB). (Deprecated)",
Advanced: true,
}, {
Name: "chunk_size",
@ -241,7 +231,6 @@ type Options struct {
MSIResourceID string `config:"msi_mi_res_id"`
Endpoint string `config:"endpoint"`
SASURL string `config:"sas_url"`
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"`
@ -397,21 +386,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return
}
func checkUploadCutoff(cs fs.SizeSuffix) error {
if cs > maxUploadCutoff {
return errors.Errorf("%v must be less than or equal to %v", cs, maxUploadCutoff)
}
return nil
}
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadCutoff(cs)
if err == nil {
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
}
return
}
// httpClientFactory creates a Factory object that sends HTTP requests
// to an rclone's http.Client.
//
@ -506,10 +480,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, err
}
err = checkUploadCutoff(opt.UploadCutoff)
if err != nil {
return nil, errors.Wrap(err, "azure: upload cutoff")
}
err = checkUploadChunkSize(opt.ChunkSize)
if err != nil {
return nil, errors.Wrap(err, "azure: chunk size")
@ -1510,12 +1480,6 @@ func init() {
}
}
// readSeeker joins an io.Reader and an io.Seeker
type readSeeker struct {
io.Reader
io.Seeker
}
// increment the slice passed in as LSB binary
func increment(xs []byte) {
for i, digit := range xs {
@ -1528,143 +1492,6 @@ func increment(xs []byte) {
}
}
var warnStreamUpload sync.Once
// uploadMultipart uploads a file using multipart upload
//
// Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, size int64, blob *azblob.BlobURL, httpHeaders *azblob.BlobHTTPHeaders) (err error) {
// Calculate correct chunkSize
chunkSize := int64(o.fs.opt.ChunkSize)
totalParts := -1
// Note that the max size of file is 4.75 TB (100 MB X 50,000
// blocks) and this is bigger than the max uncommitted block
// size (9.52 TB) so we do not need to part commit block lists
// or garbage collect uncommitted blocks.
//
// See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of
// 195GB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(o, "Streaming uploads using chunk size %v will have maximum file size of %v",
o.fs.opt.ChunkSize, fs.SizeSuffix(chunkSize*maxTotalParts))
})
} else {
// Adjust partSize until the number of parts is small enough.
if size/chunkSize >= maxTotalParts {
// Calculate partition size rounded up to the nearest MB
chunkSize = (((size / maxTotalParts) >> 20) + 1) << 20
}
if chunkSize > int64(maxChunkSize) {
return errors.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), totalParts, fs.SizeSuffix(chunkSize/2))
}
totalParts = int(size / chunkSize)
if size%chunkSize != 0 {
totalParts++
}
}
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, fs.SizeSuffix(chunkSize))
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
// Upload the chunks
var (
g, gCtx = errgroup.WithContext(ctx)
remaining = size // remaining size in file for logging only, -1 if size < 0
position = int64(0) // position in file
memPool = o.fs.getMemoryPool(chunkSize) // pool to get memory from
finished = false // set when we have read EOF
blocks []string // list of blocks for finalize
blockBlobURL = blob.ToBlockBlobURL() // Get BlockBlobURL, we will use default pipeline here
ac = azblob.LeaseAccessConditions{} // Use default lease access conditions
binaryBlockID = make([]byte, 8) // block counter as LSB first 8 bytes
)
for part := 0; !finished; part++ {
// Get a block of memory from the pool and a token which limits concurrency
o.fs.uploadToken.Get()
buf := memPool.Get()
free := func() {
memPool.Put(buf) // return the buf
o.fs.uploadToken.Put() // return the token
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
n, err := readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 { // end if no data
free()
break
}
finished = true
} else if err != nil {
free()
return errors.Wrap(err, "multipart upload failed to read source")
}
buf = buf[:n]
// increment the blockID and save the blocks for finalize
increment(binaryBlockID)
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
blocks = append(blocks, blockID)
// Transfer the chunk
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, totalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
g.Go(func() (err error) {
defer free()
// Upload the block, with MD5 for check
md5sum := md5.Sum(buf)
transactionalMD5 := md5sum[:]
err = o.fs.pacer.Call(func() (bool, error) {
bufferReader := bytes.NewReader(buf)
wrappedReader := wrap(bufferReader)
rs := readSeeker{wrappedReader, bufferReader}
_, err = blockBlobURL.StageBlock(ctx, blockID, &rs, ac, transactionalMD5, azblob.ClientProvidedKeyOptions{})
return o.fs.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to upload part")
}
return nil
})
// ready for next block
if size >= 0 {
remaining -= chunkSize
}
position += chunkSize
}
err = g.Wait()
if err != nil {
return err
}
// Finalise the upload session
err = o.fs.pacer.Call(func() (bool, error) {
_, err := blockBlobURL.CommitBlockList(ctx, blocks, *httpHeaders, o.meta, azblob.BlobAccessConditions{}, azblob.AccessTierType(o.fs.opt.AccessTier), nil, azblob.ClientProvidedKeyOptions{})
return o.fs.shouldRetry(err)
})
if err != nil {
return errors.Wrap(err, "multipart upload failed to finalize")
}
return nil
}
// Update the object with the contents of the io.Reader, modTime and size
//
// The new object may have been created if an error is returned
@ -1685,7 +1512,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
size := src.Size()
// Update Mod time
o.updateMetadataWithModTime(src.ModTime(ctx))
if err != nil {
@ -1695,10 +1522,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
blob := o.getBlobReference()
httpHeaders := azblob.BlobHTTPHeaders{}
httpHeaders.ContentType = fs.MimeType(ctx, src)
// Compute the Content-MD5 of the file, for multiparts uploads it
// Compute the Content-MD5 of the file. As we stream all uploads it
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
// Note: If multipart, an MD5 checksum will also be computed for each uploaded block
// in order to validate its integrity during transport
if !o.fs.opt.DisableCheckSum {
if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
sourceMD5bytes, err := hex.DecodeString(sourceMD5)
@ -1716,26 +1542,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
Metadata: o.meta,
BlobHTTPHeaders: httpHeaders,
}
// FIXME Until https://github.com/Azure/azure-storage-blob-go/pull/75
// is merged the SDK can't upload a single blob of exactly the chunk
// size, so upload with a multipart upload to work around.
// See: https://github.com/rclone/rclone/issues/2653
multipartUpload := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if size == int64(o.fs.opt.ChunkSize) {
multipartUpload = true
fs.Debugf(o, "Setting multipart upload for file of chunk size (%d) to work around SDK bug", size)
}
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
if multipartUpload {
// If a large file upload in chunks
err = o.uploadMultipart(ctx, in, size, &blob, &httpHeaders)
} else {
// Write a small blob in one transaction
blockBlobURL := blob.ToBlockBlobURL()
_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
}
// Stream contents of the reader object to the given blob URL
blockBlobURL := blob.ToBlockBlobURL()
_, err = azblob.UploadStreamToBlockBlob(ctx, in, blockBlobURL, putBlobOptions)
return o.fs.shouldRetry(err)
})
if err != nil {

View File

@ -29,13 +29,8 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetUploadCutoffer = (*Fs)(nil)
)
// TestServicePrincipalFileSuccess checks that, given a proper JSON file, we can create a token.

View File

@ -146,27 +146,6 @@ Container level SAS URLs are useful for temporarily allowing third
parties access to a single container or putting credentials into an
untrusted environment such as a CI build server.
### Multipart uploads ###
Rclone supports multipart uploads with Azure Blob storage. Files
bigger than 256MB will be uploaded using chunked upload by default.
The files will be uploaded in parallel in 4MB chunks (by default).
Note that these chunks are buffered in memory and there may be up to
`--transfers` of them being uploaded at once.
Files can't be split into more than 50,000 chunks so by default, so
the largest file that can be uploaded with 4MB chunk size is 195GB.
Above this rclone will double the chunk size until it creates less
than 50,000 chunks. By default this will mean a maximum file size of
3.2TB can be uploaded. This can be raised to 5TB using
`--azureblob-chunk-size 100M`.
Note that rclone doesn't commit the block list until the end of the
upload which means that there is a limit of 9.5TB of multipart uploads
in progress as Azure won't allow more than that amount of uncommitted
blocks.
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/azureblob/azureblob.go then run make backenddocs" >}}
### Standard Options
@ -223,15 +202,6 @@ Leave blank normally.
- Type: string
- Default: ""
#### --azureblob-upload-cutoff
Cutoff for switching to chunked upload (<= 256MB).
- Config: upload_cutoff
- Env Var: RCLONE_AZUREBLOB_UPLOAD_CUTOFF
- Type: SizeSuffix
- Default: 256M
#### --azureblob-chunk-size
Upload chunk size (<= 100MB).