mirror of
https://github.com/rclone/rclone.git
synced 2025-01-13 20:38:12 +02:00
s3: use PutObject from the aws SDK to upload single part objects
Before this change rclone used presigned requests to upload single part objects. This was because of a limitation in the SDK which didn't allow non seekable io.Readers to be passed in. This is incompatible with some S3 backends, and rclone wasn't adding the `X-Amz-Content-Sha256: UNSIGNED-PAYLOAD` header which was incompatible with other S3 backends. The SDK now allows for this so rclone can use PutObject directly. This sets the `X-Amz-Content-Sha256: UNSIGNED-PAYLOAD` flag on the PUT request. However rclone will add a `Content-Md5` header if at all possible so the body data is still protected. Note that the old behaviour can still be configured if required with the `use_presigned_request` config parameter. Fixes #5422
This commit is contained in:
parent
50a0c3482d
commit
e5974ac4b0
229
backend/s3/s3.go
229
backend/s3/s3.go
@ -32,6 +32,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/endpoints"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
v4 "github.com/aws/aws-sdk-go/aws/signer/v4"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/ncw/swift/v2"
|
||||
"github.com/rclone/rclone/fs"
|
||||
@ -1823,6 +1824,20 @@ This should be true, false or left unset to use the default for the provider.
|
||||
`,
|
||||
Default: fs.Tristate{},
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "use_presigned_request",
|
||||
Help: `Whether to use a presigned request or PutObject for single part uploads
|
||||
|
||||
If this is false rclone will use PutObject from the AWS SDK to upload
|
||||
an object.
|
||||
|
||||
Versions of rclone < 1.59 use presigned requests to upload a single
|
||||
part object and setting this flag to true will re-enable that
|
||||
functionality. This shouldn't be necessary except in exceptional
|
||||
circumstances or for testing.
|
||||
`,
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
},
|
||||
}})
|
||||
}
|
||||
@ -1888,6 +1903,7 @@ type Options struct {
|
||||
DisableHTTP2 bool `config:"disable_http2"`
|
||||
DownloadURL string `config:"download_url"`
|
||||
UseMultipartEtag fs.Tristate `config:"use_multipart_etag"`
|
||||
UsePresignedRequest bool `config:"use_presigned_request"`
|
||||
}
|
||||
|
||||
// Fs represents a remote s3 server
|
||||
@ -1899,6 +1915,7 @@ type Fs struct {
|
||||
ctx context.Context // global context for reading config
|
||||
features *fs.Features // optional features
|
||||
c *s3.S3 // the connection to the s3 server
|
||||
cu *s3.S3 // unsigned connection to the s3 server for PutObject
|
||||
ses *session.Session // the s3 session
|
||||
rootBucket string // bucket part of root (if any)
|
||||
rootDirectory string // directory part of root (if any)
|
||||
@ -2031,7 +2048,11 @@ func getClient(ctx context.Context, opt *Options) *http.Client {
|
||||
}
|
||||
|
||||
// s3Connection makes a connection to s3
|
||||
func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *session.Session, error) {
|
||||
//
|
||||
// If unsignedBody is set then the connection is configured for
|
||||
// unsigned bodies which is necessary for PutObject if we don't want
|
||||
// it to seek
|
||||
func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S3, *s3.S3, *session.Session, error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
// Make the auth
|
||||
v := credentials.Value{
|
||||
@ -2048,7 +2069,7 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
||||
// start a new AWS session
|
||||
awsSession, err := session.NewSession()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("NewSession: %w", err)
|
||||
return nil, nil, nil, fmt.Errorf("NewSession: %w", err)
|
||||
}
|
||||
|
||||
// first provider to supply a credential set "wins"
|
||||
@ -2088,9 +2109,9 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
||||
// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
|
||||
cred = credentials.AnonymousCredentials
|
||||
case v.AccessKeyID == "":
|
||||
return nil, nil, errors.New("access_key_id not found")
|
||||
return nil, nil, nil, errors.New("access_key_id not found")
|
||||
case v.SecretAccessKey == "":
|
||||
return nil, nil, errors.New("secret_access_key not found")
|
||||
return nil, nil, nil, errors.New("secret_access_key not found")
|
||||
}
|
||||
|
||||
if opt.Region == "" {
|
||||
@ -2129,10 +2150,14 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
||||
// (from the shared config file) if the passed-in Options.Config.Credentials is nil.
|
||||
awsSessionOpts.Config.Credentials = nil
|
||||
}
|
||||
// Setting this stops PutObject reading the body twice and seeking
|
||||
// We add our own Content-MD5 for data protection
|
||||
awsSessionOpts.Config.S3DisableContentMD5Validation = aws.Bool(true)
|
||||
ses, err := session.NewSessionWithOptions(awsSessionOpts)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
newC := func(unsignedBody bool) *s3.S3 {
|
||||
c := s3.New(ses)
|
||||
if opt.V2Auth || opt.Region == "other-v2-signature" {
|
||||
fs.Debugf(nil, "Using v2 auth")
|
||||
@ -2146,8 +2171,15 @@ func s3Connection(ctx context.Context, opt *Options, client *http.Client) (*s3.S
|
||||
c.Handlers.Sign.Clear()
|
||||
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
||||
c.Handlers.Sign.PushBack(signer)
|
||||
} else if unsignedBody {
|
||||
// If the body is unsigned then tell the signer that we aren't signing the payload
|
||||
c.Handlers.Sign.Clear()
|
||||
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
||||
c.Handlers.Sign.PushBackNamed(v4.BuildNamedHandler("v4.SignRequestHandler.WithUnsignedPayload", v4.WithUnsignedPayload))
|
||||
}
|
||||
return c, ses, nil
|
||||
return c
|
||||
}
|
||||
return newC(false), newC(true), ses, nil
|
||||
}
|
||||
|
||||
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
@ -2329,7 +2361,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
opt.SSECustomerKeyMD5 = base64.StdEncoding.EncodeToString(md5sumBinary[:])
|
||||
}
|
||||
srv := getClient(ctx, opt)
|
||||
c, ses, err := s3Connection(ctx, opt, srv)
|
||||
c, cu, ses, err := s3Connection(ctx, opt, srv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -2347,6 +2379,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
ci: ci,
|
||||
ctx: ctx,
|
||||
c: c,
|
||||
cu: cu,
|
||||
ses: ses,
|
||||
pacer: pc,
|
||||
cache: bucket.NewCache(),
|
||||
@ -2470,11 +2503,12 @@ func (f *Fs) updateRegionForBucket(ctx context.Context, bucket string) error {
|
||||
// Make a new session with the new region
|
||||
oldRegion := f.opt.Region
|
||||
f.opt.Region = region
|
||||
c, ses, err := s3Connection(f.ctx, &f.opt, f.srv)
|
||||
c, cu, ses, err := s3Connection(f.ctx, &f.opt, f.srv)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating new session failed: %w", err)
|
||||
}
|
||||
f.c = c
|
||||
f.cu = cu
|
||||
f.ses = ses
|
||||
|
||||
fs.Logf(f, "Switched region to %q from %q", region, oldRegion)
|
||||
@ -3971,6 +4005,117 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
||||
return etag, nil
|
||||
}
|
||||
|
||||
// unWrapAwsError unwraps AWS errors, looking for a non AWS error
|
||||
//
|
||||
// It returns true if one was found and the error, or false and the
|
||||
// error passed in.
|
||||
func unWrapAwsError(err error) (found bool, outErr error) {
|
||||
if awsErr, ok := err.(awserr.Error); ok {
|
||||
var origErrs []error
|
||||
if batchErr, ok := awsErr.(awserr.BatchError); ok {
|
||||
origErrs = batchErr.OrigErrs()
|
||||
} else {
|
||||
origErrs = []error{awsErr.OrigErr()}
|
||||
}
|
||||
for _, origErr := range origErrs {
|
||||
found, newErr := unWrapAwsError(origErr)
|
||||
if found {
|
||||
return found, newErr
|
||||
}
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, err
|
||||
}
|
||||
|
||||
// Upload a single part using PutObject
|
||||
func (o *Object) uploadSinglepartPutObject(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) {
|
||||
req.Body = readers.NewFakeSeeker(in, size)
|
||||
var resp *s3.PutObjectOutput
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = o.fs.cu.PutObject(req)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
// Return the underlying error if we have a Serialization error if possible
|
||||
//
|
||||
// Serialization errors are synthesized locally in the SDK (not returned from the
|
||||
// server). We'll get one if the SDK attempts a retry, however the FakeSeeker will
|
||||
// remember the previous error from Read and return that.
|
||||
if do, ok := err.(awserr.Error); ok && do.Code() == request.ErrCodeSerialization {
|
||||
if found, newErr := unWrapAwsError(err); found {
|
||||
err = newErr
|
||||
}
|
||||
}
|
||||
return etag, lastModified, err
|
||||
}
|
||||
lastModified = time.Now()
|
||||
etag = aws.StringValue(resp.ETag)
|
||||
return etag, lastModified, nil
|
||||
}
|
||||
|
||||
// Upload a single part using a presigned request
|
||||
func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, lastModified time.Time, err error) {
|
||||
// Create the request
|
||||
putObj, _ := o.fs.c.PutObjectRequest(req)
|
||||
|
||||
// Sign it so we can upload using a presigned request.
|
||||
//
|
||||
// Note the SDK didn't used to support streaming to
|
||||
// PutObject so we used this work-around.
|
||||
url, headers, err := putObj.PresignRequest(15 * time.Minute)
|
||||
if err != nil {
|
||||
return etag, lastModified, fmt.Errorf("s3 upload: sign request: %w", err)
|
||||
}
|
||||
|
||||
if o.fs.opt.V2Auth && headers == nil {
|
||||
headers = putObj.HTTPRequest.Header
|
||||
}
|
||||
|
||||
// Set request to nil if empty so as not to make chunked encoding
|
||||
if size == 0 {
|
||||
in = nil
|
||||
}
|
||||
|
||||
// create the vanilla http request
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in)
|
||||
if err != nil {
|
||||
return etag, lastModified, fmt.Errorf("s3 upload: new request: %w", err)
|
||||
}
|
||||
|
||||
// set the headers we signed and the length
|
||||
httpReq.Header = headers
|
||||
httpReq.ContentLength = size
|
||||
|
||||
var resp *http.Response
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
var err error
|
||||
resp, err = o.fs.srv.Do(httpReq)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
body, err := rest.ReadBody(resp)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 299 {
|
||||
return false, nil
|
||||
}
|
||||
err = fmt.Errorf("s3 upload: %s: %s", resp.Status, body)
|
||||
return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
|
||||
})
|
||||
if err != nil {
|
||||
return etag, lastModified, err
|
||||
}
|
||||
if resp != nil {
|
||||
if date, err := http.ParseTime(resp.Header.Get("Date")); err != nil {
|
||||
lastModified = date
|
||||
}
|
||||
etag = resp.Header.Get("Etag")
|
||||
}
|
||||
return etag, lastModified, nil
|
||||
}
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
bucket, bucketPath := o.split()
|
||||
@ -4022,6 +4167,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
ContentType: &mimeType,
|
||||
Metadata: metadata,
|
||||
}
|
||||
if size >= 0 {
|
||||
req.ContentLength = &size
|
||||
}
|
||||
if md5sumBase64 != "" {
|
||||
req.ContentMD5 = &md5sumBase64
|
||||
}
|
||||
@ -4076,66 +4224,21 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
}
|
||||
}
|
||||
|
||||
var resp *http.Response // response from PUT
|
||||
var wantETag string // Multipart upload Etag to check
|
||||
var gotEtag string // Etag we got from the upload
|
||||
var lastModified time.Time // Time we got from the upload
|
||||
if multipart {
|
||||
wantETag, err = o.uploadMultipart(ctx, &req, size, in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
|
||||
// Create the request
|
||||
putObj, _ := o.fs.c.PutObjectRequest(&req)
|
||||
|
||||
// Sign it so we can upload using a presigned request.
|
||||
//
|
||||
// Note the SDK doesn't currently support streaming to
|
||||
// PutObject so we'll use this work-around.
|
||||
url, headers, err := putObj.PresignRequest(15 * time.Minute)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3 upload: sign request: %w", err)
|
||||
if o.fs.opt.UsePresignedRequest {
|
||||
gotEtag, lastModified, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in)
|
||||
} else {
|
||||
gotEtag, lastModified, err = o.uploadSinglepartPutObject(ctx, &req, size, in)
|
||||
}
|
||||
|
||||
if o.fs.opt.V2Auth && headers == nil {
|
||||
headers = putObj.HTTPRequest.Header
|
||||
}
|
||||
|
||||
// Set request to nil if empty so as not to make chunked encoding
|
||||
if size == 0 {
|
||||
in = nil
|
||||
}
|
||||
|
||||
// create the vanilla http request
|
||||
httpReq, err := http.NewRequestWithContext(ctx, "PUT", url, in)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3 upload: new request: %w", err)
|
||||
}
|
||||
|
||||
// set the headers we signed and the length
|
||||
httpReq.Header = headers
|
||||
httpReq.ContentLength = size
|
||||
|
||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||
var err error
|
||||
resp, err = o.fs.srv.Do(httpReq)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
body, err := rest.ReadBody(resp)
|
||||
if err != nil {
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
}
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 299 {
|
||||
return false, nil
|
||||
}
|
||||
err = fmt.Errorf("s3 upload: %s: %s", resp.Status, body)
|
||||
return fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// User requested we don't HEAD the object after uploading it
|
||||
// so make up the object as best we can assuming it got
|
||||
@ -4148,11 +4251,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
o.mimeType = aws.StringValue(req.ContentType)
|
||||
o.storageClass = aws.StringValue(req.StorageClass)
|
||||
// If we have done a single part PUT request then we can read these
|
||||
if resp != nil {
|
||||
if date, err := http.ParseTime(resp.Header.Get("Date")); err == nil {
|
||||
o.lastModified = date
|
||||
if gotEtag != "" {
|
||||
o.setMD5FromEtag(gotEtag)
|
||||
}
|
||||
o.setMD5FromEtag(resp.Header.Get("Etag"))
|
||||
if !o.lastModified.IsZero() {
|
||||
o.lastModified = lastModified
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -2479,12 +2479,19 @@ You will be able to list and copy data but not upload it.
|
||||
This is the provider used as main example and described in the [configuration](#configuration) section above.
|
||||
|
||||
### AWS Snowball Edge
|
||||
[AWS Snowball](https://aws.amazon.com/snowball/) is a hardware appliance used for transferring
|
||||
bulk data back to AWS. Its main software interface is S3 object storage.
|
||||
|
||||
To use rclone with AWS Snowball Edge devices, configure as standard for an 'S3 Compatible Service'
|
||||
be sure to set `upload_cutoff = 0` otherwise you will run into authentication header issues as
|
||||
the snowball device does not support query parameter based authentication.
|
||||
[AWS Snowball](https://aws.amazon.com/snowball/) is a hardware
|
||||
appliance used for transferring bulk data back to AWS. Its main
|
||||
software interface is S3 object storage.
|
||||
|
||||
To use rclone with AWS Snowball Edge devices, configure as standard
|
||||
for an 'S3 Compatible Service'.
|
||||
|
||||
If using rclone pre v1.59 be sure to set `upload_cutoff = 0` otherwise
|
||||
you will run into authentication header issues as the snowball device
|
||||
does not support query parameter based authentication.
|
||||
|
||||
With rclone v1.59 or later setting `upload_cutoff` should not be necessary.
|
||||
|
||||
eg.
|
||||
```
|
||||
@ -2523,10 +2530,11 @@ server_side_encryption =
|
||||
storage_class =
|
||||
```
|
||||
|
||||
If you are using an older version of CEPH, e.g. 10.2.x Jewel, then you
|
||||
may need to supply the parameter `--s3-upload-cutoff 0` or put this in
|
||||
the config file as `upload_cutoff 0` to work around a bug which causes
|
||||
uploading of small files to fail.
|
||||
If you are using an older version of CEPH (e.g. 10.2.x Jewel) and a
|
||||
version of rclone before v1.59 then you may need to supply the
|
||||
parameter `--s3-upload-cutoff 0` or put this in the config file as
|
||||
`upload_cutoff 0` to work around a bug which causes uploading of small
|
||||
files to fail.
|
||||
|
||||
Note also that Ceph sometimes puts `/` in the passwords it gives
|
||||
users. If you read the secret access key using the command line tools
|
||||
|
Loading…
Reference in New Issue
Block a user