From e5974ac4b0a847e864ebfc82e1bbb8077084ccd3 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 3 May 2022 17:39:01 +0100 Subject: [PATCH] s3: use PutObject from the aws SDK to upload single part objects Before this change rclone used presigned requests to upload single part objects. This was because of a limitation in the SDK which didn't allow non seekable io.Readers to be passed in. This is incompatible with some S3 backends, and rclone wasn't adding the `X-Amz-Content-Sha256: UNSIGNED-PAYLOAD` header which was incompatible with other S3 backends. The SDK now allows for this so rclone can use PutObject directly. This sets the `X-Amz-Content-Sha256: UNSIGNED-PAYLOAD` flag on the PUT request. However rclone will add a `Content-Md5` header if at all possible so the body data is still protected. Note that the old behaviour can still be configured if required with the `use_presigned_request` config parameter. Fixes #5422 --- backend/s3/s3.go | 261 +++++++++++++++++++++++++++++++-------------- docs/content/s3.md | 26 +++-- 2 files changed, 199 insertions(+), 88 deletions(-) diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 81f071b92..cee3636b4 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -32,6 +32,7 @@ import ( "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/aws/aws-sdk-go/service/s3" "github.com/ncw/swift/v2" "github.com/rclone/rclone/fs" @@ -1823,6 +1824,20 @@ This should be true, false or left unset to use the default for the provider. `, Default: fs.Tristate{}, Advanced: true, + }, { + Name: "use_presigned_request", + Help: `Whether to use a presigned request or PutObject for single part uploads + +If this is false rclone will use PutObject from the AWS SDK to upload +an object. + +Versions of rclone < 1.59 use presigned requests to upload a single +part object and setting this flag to true will re-enable that +functionality. This shouldn't be necessary except in exceptional +circumstances or for testing. +`, + Default: false, + Advanced: true, }, }}) } @@ -1888,6 +1903,7 @@ type Options struct { DisableHTTP2 bool `config:"disable_http2"` DownloadURL string `config:"download_url"` UseMultipartEtag fs.Tristate `config:"use_multipart_etag"` + UsePresignedRequest bool `config:"use_presigned_request"` } // Fs represents a remote s3 server @@ -1899,6 +1915,7 @@ type Fs struct { ctx context.Context // global context for reading config features *fs.Features // optional features c *s3.S3 // the connection to the s3 server + cu *s3.S3 // unsigned connection to the s3 server for PutObject ses *session.Session // the s3 session rootBucket string // bucket part of root (if any) rootDirectory string // directory part of root (if any) @@ -2031,7 +2048,11 @@ func getClient(ctx context.Context, opt *Options) *http.Client { } // s3Connection makes a connection to s3 -func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *session.Session, error) { +// +// If unsignedBody is set then the connection is configured for +// unsigned bodies which is necessary for PutObject if we don't want +// it to seek +func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *s3.S3, *session.Session, error) { ci := fs.GetConfig(ctx) // Make the auth v := credentials.Value{ @@ -2048,7 +2069,7 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // start a new AWS session awsSession, err := session.NewSession() if err != nil { - return nil, nil, fmt.Errorf("NewSession: %w", err) + return nil, nil, nil, fmt.Errorf("NewSession: %w", err) } // first provider to supply a credential set "wins" @@ -2088,9 +2109,9 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // if no access key/secret and iam is explicitly disabled then fall back to anon interaction cred = credentials.AnonymousCredentials case v.AccessKeyID == "": - return nil, nil, errors.New("access_key_id not found") + return nil, nil, nil, errors.New("access_key_id not found") case v.SecretAccessKey == "": - return nil, nil, errors.New("secret_access_key not found") + return nil, nil, nil, errors.New("secret_access_key not found") } if opt.Region == "" { @@ -2129,25 +2150,36 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S // (from the shared config file) if the passed-in Options.Config.Credentials is nil. awsSessionOpts.Config.Credentials = nil } + // Setting this stops PutObject reading the body twice and seeking + // We add our own Content-MD5 for data protection + awsSessionOpts.Config.S3DisableContentMD5Validation = aws.Bool(true) ses, err := session.NewSessionWithOptions(awsSessionOpts) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - c := s3.New(ses) - if opt.V2Auth || opt.Region == "other-v2-signature" { - fs.Debugf(nil, "Using v2 auth") - signer := func(req *request.Request) { - // Ignore AnonymousCredentials object - if req.Config.Credentials == credentials.AnonymousCredentials { - return + newC := func(unsignedBody bool) *s3.S3 { + c := s3.New(ses) + if opt.V2Auth || opt.Region == "other-v2-signature" { + fs.Debugf(nil, "Using v2 auth") + signer := func(req *request.Request) { + // Ignore AnonymousCredentials object + if req.Config.Credentials == credentials.AnonymousCredentials { + return + } + sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) } - sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest) + c.Handlers.Sign.Clear() + c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) + c.Handlers.Sign.PushBack(signer) + } else if unsignedBody { + // If the body is unsigned then tell the signer that we aren't signing the payload + c.Handlers.Sign.Clear() + c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) + c.Handlers.Sign.PushBackNamed(v4.BuildNamedHandler("v4.SignRequestHandler.WithUnsignedPayload", v4.WithUnsignedPayload)) } - c.Handlers.Sign.Clear() - c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler) - c.Handlers.Sign.PushBack(signer) + return c } - return c, ses, nil + return newC(false), newC(true), ses, nil } func checkUploadChunkSize(cs fs.SizeSuffix) error { @@ -2329,7 +2361,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e opt.SSECustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:]) } srv := getClient(ctx, opt) - c, ses, err := s3Connection(ctx, opt, srv) + c, cu, ses, err := s3Connection(ctx, opt, srv) if err != nil { return nil, err } @@ -2347,6 +2379,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, ctx: ctx, c: c, + cu: cu, ses: ses, pacer: pc, cache: bucket.NewCache(), @@ -2470,11 +2503,12 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error { // Make a new session with the new region oldRegion := f.opt.Region f.opt.Region = region - c, ses, err := s3Connection(f.ctx, &f.opt, f.srv) + c, cu, ses, err := s3Connection(f.ctx, &f.opt, f.srv) if err != nil { return fmt.Errorf("creating new session failed: %w", err) } f.c = c + f.cu = cu f.ses = ses fs.Logf(f, "Switched region to %q from %q", region, oldRegion) @@ -3971,6 +4005,117 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si return etag, nil } +// unWrapAwsError unwraps AWS errors, looking for a non AWS error +// +// It returns true if one was found and the error, or false and the +// error passed in. +func unWrapAwsError(err error) (found bool, outErr error) { + if awsErr, ok := err.(awserr.Error); ok { + var origErrs []error + if batchErr, ok := awsErr.(awserr.BatchError); ok { + origErrs = batchErr.OrigErrs() + } else { + origErrs = []error{awsErr.OrigErr()} + } + for _, origErr := range origErrs { + found, newErr := unWrapAwsError(origErr) + if found { + return found, newErr + } + } + return false, err + } + return true, err +} + +// Upload a single part using PutObject +func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { + req.Body = readers.NewFakeSeeker(in, size) + var resp *s3.PutObjectOutput + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err = o.fs.cu.PutObject(req) + return o.fs.shouldRetry(ctx, err) + }) + if err != nil { + // Return the underlying error if we have a Serialization error if possible + // + // Serialization errors are synthesized locally in the SDK (not returned from the + // server). We'll get one if the SDK attempts a retry, however the FakeSeeker will + // remember the previous error from Read and return that. + if do, ok := err.(awserr.Error); ok && do.Code() == request.ErrCodeSerialization { + if found, newErr := unWrapAwsError(err); found { + err = newErr + } + } + return etag, lastModified, err + } + lastModified = time.Now() + etag = aws.StringValue(resp.ETag) + return etag, lastModified, nil +} + +// Upload a single part using a presigned request +func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) { + // Create the request + putObj, _ := o.fs.c.PutObjectRequest(req) + + // Sign it so we can upload using a presigned request. + // + // Note the SDK didn't used to support streaming to + // PutObject so we used this work-around. + url, headers, err := putObj.PresignRequest(15 * time.Minute) + if err != nil { + return etag, lastModified, fmt.Errorf("s3 upload: sign request: %w", err) + } + + if o.fs.opt.V2Auth && headers == nil { + headers = putObj.HTTPRequest.Header + } + + // Set request to nil if empty so as not to make chunked encoding + if size == 0 { + in = nil + } + + // create the vanilla http request + httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in) + if err != nil { + return etag, lastModified, fmt.Errorf("s3 upload: new request: %w", err) + } + + // set the headers we signed and the length + httpReq.Header = headers + httpReq.ContentLength = size + + var resp *http.Response + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + var err error + resp, err = o.fs.srv.Do(httpReq) + if err != nil { + return o.fs.shouldRetry(ctx, err) + } + body, err := rest.ReadBody(resp) + if err != nil { + return o.fs.shouldRetry(ctx, err) + } + if resp.StatusCode >= 200 && resp.StatusCode < 299 { + return false, nil + } + err = fmt.Errorf("s3 upload: %s: %s", resp.Status, body) + return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err + }) + if err != nil { + return etag, lastModified, err + } + if resp != nil { + if date, err := http.ParseTime(resp.Header.Get("Date")); err != nil { + lastModified = date + } + etag = resp.Header.Get("Etag") + } + return etag, lastModified, nil +} + // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { bucket, bucketPath := o.split() @@ -4022,6 +4167,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op ContentType: &mimeType, Metadata: metadata, } + if size >= 0 { + req.ContentLength = &size + } if md5sumBase64 != "" { req.ContentMD5 = &md5sumBase64 } @@ -4076,66 +4224,21 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } - var resp *http.Response // response from PUT - var wantETag string // Multipart upload Etag to check + var wantETag string // Multipart upload Etag to check + var gotEtag string // Etag we got from the upload + var lastModified time.Time // Time we got from the upload if multipart { wantETag, err = o.uploadMultipart(ctx, &req, size, in) - if err != nil { - return err - } } else { - - // Create the request - putObj, _ := o.fs.c.PutObjectRequest(&req) - - // Sign it so we can upload using a presigned request. - // - // Note the SDK doesn't currently support streaming to - // PutObject so we'll use this work-around. - url, headers, err := putObj.PresignRequest(15 * time.Minute) - if err != nil { - return fmt.Errorf("s3 upload: sign request: %w", err) - } - - if o.fs.opt.V2Auth && headers == nil { - headers = putObj.HTTPRequest.Header - } - - // Set request to nil if empty so as not to make chunked encoding - if size == 0 { - in = nil - } - - // create the vanilla http request - httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in) - if err != nil { - return fmt.Errorf("s3 upload: new request: %w", err) - } - - // set the headers we signed and the length - httpReq.Header = headers - httpReq.ContentLength = size - - err = o.fs.pacer.CallNoRetry(func() (bool, error) { - var err error - resp, err = o.fs.srv.Do(httpReq) - if err != nil { - return o.fs.shouldRetry(ctx, err) - } - body, err := rest.ReadBody(resp) - if err != nil { - return o.fs.shouldRetry(ctx, err) - } - if resp.StatusCode >= 200 && resp.StatusCode < 299 { - return false, nil - } - err = fmt.Errorf("s3 upload: %s: %s", resp.Status, body) - return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err - }) - if err != nil { - return err + if o.fs.opt.UsePresignedRequest { + gotEtag, lastModified, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in) + } else { + gotEtag, lastModified, err = o.uploadSinglepartPutObject(ctx, &req, size, in) } } + if err != nil { + return err + } // User requested we don't HEAD the object after uploading it // so make up the object as best we can assuming it got @@ -4148,11 +4251,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op o.mimeType = aws.StringValue(req.ContentType) o.storageClass = aws.StringValue(req.StorageClass) // If we have done a single part PUT request then we can read these - if resp != nil { - if date, err := http.ParseTime(resp.Header.Get("Date")); err == nil { - o.lastModified = date - } - o.setMD5FromEtag(resp.Header.Get("Etag")) + if gotEtag != "" { + o.setMD5FromEtag(gotEtag) + } + if !o.lastModified.IsZero() { + o.lastModified = lastModified } return nil } diff --git a/docs/content/s3.md b/docs/content/s3.md index f51f4b4a1..b9afa351b 100644 --- a/docs/content/s3.md +++ b/docs/content/s3.md @@ -2479,12 +2479,19 @@ You will be able to list and copy data but not upload it. This is the provider used as main example and described in the [configuration](#configuration) section above. ### AWS Snowball Edge -[AWS Snowball](https://aws.amazon.com/snowball/) is a hardware appliance used for transferring -bulk data back to AWS. Its main software interface is S3 object storage. -To use rclone with AWS Snowball Edge devices, configure as standard for an 'S3 Compatible Service' -be sure to set `upload_cutoff = 0` otherwise you will run into authentication header issues as -the snowball device does not support query parameter based authentication. +[AWS Snowball](https://aws.amazon.com/snowball/) is a hardware +appliance used for transferring bulk data back to AWS. Its main +software interface is S3 object storage. + +To use rclone with AWS Snowball Edge devices, configure as standard +for an 'S3 Compatible Service'. + +If using rclone pre v1.59 be sure to set `upload_cutoff = 0` otherwise +you will run into authentication header issues as the snowball device +does not support query parameter based authentication. + +With rclone v1.59 or later setting `upload_cutoff` should not be necessary. eg. ``` @@ -2523,10 +2530,11 @@ server_side_encryption = storage_class = ``` -If you are using an older version of CEPH, e.g. 10.2.x Jewel, then you -may need to supply the parameter `--s3-upload-cutoff 0` or put this in -the config file as `upload_cutoff 0` to work around a bug which causes -uploading of small files to fail. +If you are using an older version of CEPH (e.g. 10.2.x Jewel) and a +version of rclone before v1.59 then you may need to supply the +parameter `--s3-upload-cutoff 0` or put this in the config file as +`upload_cutoff 0` to work around a bug which causes uploading of small +files to fail. Note also that Ceph sometimes puts `/` in the passwords it gives users. If you read the secret access key using the command line tools