mirror of
https://github.com/rclone/rclone.git
synced 2025-01-13 20:38:12 +02:00
swift, b2, gcs, s3: Fix moveto and copyto
We now make sure the container/bucket is created before creating any objects.
This commit is contained in:
parent
b49821956a
commit
9c1e703777
16
b2/b2.go
16
b2/b2.go
@ -86,6 +86,8 @@ type Fs struct {
|
||||
endpoint string // name of the starting api endpoint
|
||||
srv *rest.Client // the connection to the b2 server
|
||||
bucket string // the bucket we are working on
|
||||
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
||||
bucketOK bool // true if we have created the bucket
|
||||
bucketIDMutex sync.Mutex // mutex to protect _bucketID
|
||||
_bucketID string // the ID of the bucket we are working on
|
||||
info api.AuthorizeAccountResponse // result of authorize call
|
||||
@ -671,8 +673,9 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
|
||||
|
||||
// Mkdir creates the bucket if it doesn't exist
|
||||
func (f *Fs) Mkdir(dir string) error {
|
||||
// Can't create subdirs
|
||||
if dir != "" {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.bucketOK {
|
||||
return nil
|
||||
}
|
||||
opts := rest.Opts{
|
||||
@ -697,6 +700,7 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
_, getBucketErr := f.getBucketID()
|
||||
if getBucketErr == nil {
|
||||
// found so it is our bucket
|
||||
f.bucketOK = true
|
||||
return nil
|
||||
}
|
||||
if getBucketErr != fs.ErrorDirNotFound {
|
||||
@ -707,6 +711,7 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
return errors.Wrap(err, "failed to create bucket")
|
||||
}
|
||||
f.setBucketID(response.ID)
|
||||
f.bucketOK = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -714,6 +719,8 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
//
|
||||
// Returns an error if it isn't empty
|
||||
func (f *Fs) Rmdir(dir string) error {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.root != "" || dir != "" {
|
||||
return nil
|
||||
}
|
||||
@ -737,6 +744,7 @@ func (f *Fs) Rmdir(dir string) error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to delete bucket")
|
||||
}
|
||||
f.bucketOK = false
|
||||
f.clearBucketID()
|
||||
f.clearUploadURL()
|
||||
return nil
|
||||
@ -1165,6 +1173,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||
if *b2Versions {
|
||||
return errNotWithVersions
|
||||
}
|
||||
err = o.fs.Mkdir("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := src.Size()
|
||||
|
||||
// If a large file upload in chunks - see upload.go
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
@ -135,6 +136,8 @@ type Fs struct {
|
||||
svc *storage.Service // the connection to the storage server
|
||||
client *http.Client // authorized client
|
||||
bucket string // the bucket we are working on
|
||||
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
||||
bucketOK bool // true if we have created the bucket
|
||||
projectNumber string // used for finding buckets
|
||||
objectACL string // used when creating new objects
|
||||
bucketACL string // used when creating new buckets
|
||||
@ -456,13 +459,15 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
|
||||
|
||||
// Mkdir creates the bucket if it doesn't exist
|
||||
func (f *Fs) Mkdir(dir string) error {
|
||||
// Can't create subdirs
|
||||
if dir != "" {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.bucketOK {
|
||||
return nil
|
||||
}
|
||||
_, err := f.svc.Buckets.Get(f.bucket).Do()
|
||||
if err == nil {
|
||||
// Bucket already exists
|
||||
f.bucketOK = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -474,6 +479,9 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
Name: f.bucket,
|
||||
}
|
||||
_, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do()
|
||||
if err == nil {
|
||||
f.bucketOK = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -482,10 +490,16 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
// Returns an error if it isn't empty: Error 409: The bucket you tried
|
||||
// to delete was not empty.
|
||||
func (f *Fs) Rmdir(dir string) error {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.root != "" || dir != "" {
|
||||
return nil
|
||||
}
|
||||
return f.svc.Buckets.Delete(f.bucket).Do()
|
||||
err := f.svc.Buckets.Delete(f.bucket).Do()
|
||||
if err == nil {
|
||||
f.bucketOK = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Precision returns the precision
|
||||
@ -684,6 +698,10 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||
//
|
||||
// The new object may have been created if an error is returned
|
||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
err := o.fs.Mkdir("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := src.Size()
|
||||
modTime := src.ModTime()
|
||||
|
||||
|
27
s3/s3.go
27
s3/s3.go
@ -21,6 +21,7 @@ import (
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
@ -239,6 +240,8 @@ type Fs struct {
|
||||
c *s3.S3 // the connection to the s3 server
|
||||
ses *session.Session // the s3 session
|
||||
bucket string // the bucket we are working on
|
||||
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
||||
bucketOK bool // true if we have created the bucket
|
||||
acl string // ACL for new buckets / objects
|
||||
locationConstraint string // location constraint of new buckets
|
||||
sse string // the type of server-side encryption
|
||||
@ -652,11 +655,15 @@ func (f *Fs) dirExists() (bool, error) {
|
||||
|
||||
// Mkdir creates the bucket if it doesn't exist
|
||||
func (f *Fs) Mkdir(dir string) error {
|
||||
// Can't create subdirs
|
||||
if dir != "" {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.bucketOK {
|
||||
return nil
|
||||
}
|
||||
exists, err := f.dirExists()
|
||||
if err == nil {
|
||||
f.bucketOK = exists
|
||||
}
|
||||
if err != nil || exists {
|
||||
return err
|
||||
}
|
||||
@ -672,9 +679,12 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
_, err = f.c.CreateBucket(&req)
|
||||
if err, ok := err.(awserr.Error); ok {
|
||||
if err.Code() == "BucketAlreadyOwnedByYou" {
|
||||
return nil
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
f.bucketOK = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -682,6 +692,8 @@ func (f *Fs) Mkdir(dir string) error {
|
||||
//
|
||||
// Returns an error if it isn't empty
|
||||
func (f *Fs) Rmdir(dir string) error {
|
||||
f.bucketOKMu.Lock()
|
||||
defer f.bucketOKMu.Unlock()
|
||||
if f.root != "" || dir != "" {
|
||||
return nil
|
||||
}
|
||||
@ -689,6 +701,9 @@ func (f *Fs) Rmdir(dir string) error {
|
||||
Bucket: &f.bucket,
|
||||
}
|
||||
_, err := f.c.DeleteBucket(&req)
|
||||
if err == nil {
|
||||
f.bucketOK = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -903,6 +918,10 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
err := o.fs.Mkdir("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
modTime := src.ModTime()
|
||||
|
||||
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
|
||||
@ -943,7 +962,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
||||
if o.fs.storageClass != "" {
|
||||
req.StorageClass = &o.fs.storageClass
|
||||
}
|
||||
_, err := uploader.Upload(&req)
|
||||
_, err = uploader.Upload(&req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
@ -92,6 +93,8 @@ type Fs struct {
|
||||
features *fs.Features // optional features
|
||||
c *swift.Connection // the connection to the swift server
|
||||
container string // the container we are working on
|
||||
containerOKMu sync.Mutex // mutex to protect container OK
|
||||
containerOK bool // true if we have created the container
|
||||
segmentsContainer string // container to store the segments (if any) in
|
||||
}
|
||||
|
||||
@ -410,30 +413,36 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
|
||||
|
||||
// Mkdir creates the container if it doesn't exist
|
||||
func (f *Fs) Mkdir(dir string) error {
|
||||
// Can't create subdirs
|
||||
if dir != "" {
|
||||
f.containerOKMu.Lock()
|
||||
defer f.containerOKMu.Unlock()
|
||||
if f.containerOK {
|
||||
return nil
|
||||
}
|
||||
// Check to see if container exists first
|
||||
_, _, err := f.c.Container(f.container)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err == swift.ContainerNotFound {
|
||||
return f.c.ContainerCreate(f.container, nil)
|
||||
err = f.c.ContainerCreate(f.container, nil)
|
||||
}
|
||||
if err == nil {
|
||||
f.containerOK = true
|
||||
}
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
// Rmdir deletes the container if the fs is at the root
|
||||
//
|
||||
// Returns an error if it isn't empty
|
||||
func (f *Fs) Rmdir(dir string) error {
|
||||
f.containerOKMu.Lock()
|
||||
defer f.containerOKMu.Unlock()
|
||||
if f.root != "" || dir != "" {
|
||||
return nil
|
||||
}
|
||||
return f.c.ContainerDelete(f.container)
|
||||
err := f.c.ContainerDelete(f.container)
|
||||
if err == nil {
|
||||
f.containerOK = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Precision of the remote
|
||||
@ -738,6 +747,10 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c
|
||||
//
|
||||
// The new object may have been created if an error is returned
|
||||
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
err := o.fs.Mkdir("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := src.Size()
|
||||
modTime := src.ModTime()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user