|
|
|
@ -1,5 +1,8 @@
|
|
|
|
|
// Package qingstor provides an interface to QingStor object storage
|
|
|
|
|
// Home: https://www.qingcloud.com/
|
|
|
|
|
|
|
|
|
|
// +build !plan9
|
|
|
|
|
|
|
|
|
|
package qingstor
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
@ -92,8 +95,9 @@ type Fs struct {
|
|
|
|
|
name string // The name of the remote
|
|
|
|
|
zone string // The zone we are working on
|
|
|
|
|
bucket string // The bucket we are working on
|
|
|
|
|
bucketOKMu sync.Mutex // mutex to protect bucketOK and bucketDeleted
|
|
|
|
|
bucketOK bool // true if we have created the bucket
|
|
|
|
|
bucketMtx sync.Mutex // mutex to protect bucket
|
|
|
|
|
bucketDeleted bool // true if we have deleted the bucket
|
|
|
|
|
root string // The root is a subdir, is a special object
|
|
|
|
|
features *fs.Features // optional features
|
|
|
|
|
svc *qs.Service // The connection to the qingstor server
|
|
|
|
@ -206,6 +210,7 @@ func qsServiceConnection(name string) (*qs.Service, error) {
|
|
|
|
|
cf.Host = host
|
|
|
|
|
cf.Port = port
|
|
|
|
|
cf.ConnectionRetries = connectionRetries
|
|
|
|
|
cf.Connection = fs.Config.Client()
|
|
|
|
|
|
|
|
|
|
svc, _ := qs.Init(cf)
|
|
|
|
|
|
|
|
|
@ -320,6 +325,10 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.
|
|
|
|
|
//
|
|
|
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
|
|
|
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|
|
|
|
err := f.Mkdir("")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
srcObj, ok := src.(*Object)
|
|
|
|
|
if !ok {
|
|
|
|
|
fs.Debugf(src, "Can't copy - not same remote type")
|
|
|
|
@ -329,7 +338,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|
|
|
|
key := f.root + remote
|
|
|
|
|
source := path.Join("/"+srcFs.bucket, srcFs.root+srcObj.remote)
|
|
|
|
|
|
|
|
|
|
fs.Debugf(f, fmt.Sprintf("Copied, source key is: %s, and dst key is: %s", source, key))
|
|
|
|
|
fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key)
|
|
|
|
|
req := qs.PutObjectInput{
|
|
|
|
|
XQSCopySource: &source,
|
|
|
|
|
}
|
|
|
|
@ -340,7 +349,7 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|
|
|
|
}
|
|
|
|
|
_, err = bucketInit.PutObject(key, &req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Debugf(f, fmt.Sprintf("Copied Faild, API Error: %s", err))
|
|
|
|
|
fs.Debugf(f, "Copied Faild, API Error: %v", err)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return f.NewObject(remote)
|
|
|
|
@ -612,12 +621,13 @@ func (f *Fs) dirExists() (bool, error) {
|
|
|
|
|
|
|
|
|
|
// Mkdir creates the bucket if it doesn't exist
|
|
|
|
|
func (f *Fs) Mkdir(dir string) error {
|
|
|
|
|
f.bucketMtx.Lock()
|
|
|
|
|
defer f.bucketMtx.Unlock()
|
|
|
|
|
f.bucketOKMu.Lock()
|
|
|
|
|
defer f.bucketOKMu.Unlock()
|
|
|
|
|
if f.bucketOK {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !f.bucketDeleted {
|
|
|
|
|
exists, err := f.dirExists()
|
|
|
|
|
if err == nil {
|
|
|
|
|
f.bucketOK = exists
|
|
|
|
@ -625,6 +635,7 @@ func (f *Fs) Mkdir(dir string) error {
|
|
|
|
|
if err != nil || exists {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -639,6 +650,7 @@ func (f *Fs) Mkdir(dir string) error {
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
f.bucketOK = true
|
|
|
|
|
f.bucketDeleted = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
@ -668,8 +680,8 @@ func (f *Fs) dirIsEmpty() (bool, error) {
|
|
|
|
|
|
|
|
|
|
// Rmdir delete a bucket
|
|
|
|
|
func (f *Fs) Rmdir(dir string) error {
|
|
|
|
|
f.bucketMtx.Lock()
|
|
|
|
|
defer f.bucketMtx.Unlock()
|
|
|
|
|
f.bucketOKMu.Lock()
|
|
|
|
|
defer f.bucketOKMu.Unlock()
|
|
|
|
|
if f.root != "" || dir != "" {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -679,11 +691,11 @@ func (f *Fs) Rmdir(dir string) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if !isEmpty {
|
|
|
|
|
fs.Debugf(f, fmt.Sprintf("The bucket %s you tried to delete not empty.", f.bucket))
|
|
|
|
|
fs.Debugf(f, "The bucket %s you tried to delete not empty.", f.bucket)
|
|
|
|
|
return errors.New("BucketNotEmpty: The bucket you tried to delete is not empty")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fs.Debugf(f, fmt.Sprintf("Tried to delete the bucket %s", f.bucket))
|
|
|
|
|
fs.Debugf(f, "Tried to delete the bucket %s", f.bucket)
|
|
|
|
|
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
@ -691,6 +703,7 @@ func (f *Fs) Rmdir(dir string) error {
|
|
|
|
|
_, err = bucketInit.Delete()
|
|
|
|
|
if err == nil {
|
|
|
|
|
f.bucketOK = false
|
|
|
|
|
f.bucketDeleted = true
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -705,10 +718,10 @@ func (o *Object) readMetaData() (err error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
key := o.fs.root + o.remote
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Read metadata of key: %s", key))
|
|
|
|
|
fs.Debugf(o, "Read metadata of key: %s", key)
|
|
|
|
|
resp, err := bucketInit.HeadObject(key, &qs.HeadObjectInput{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Read metadata faild, API Error: %s", err))
|
|
|
|
|
fs.Debugf(o, "Read metadata faild, API Error: %v", err)
|
|
|
|
|
if e, ok := err.(*qsErr.QingStorError); ok {
|
|
|
|
|
if e.StatusCode == http.StatusNotFound {
|
|
|
|
|
return fs.ErrorObjectNotFound
|
|
|
|
@ -834,10 +847,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Errorf(o, fmt.Sprintf("Create Object Faild, API ERROR: %s", err))
|
|
|
|
|
fs.Errorf(o, "Create Object Faild, API ERROR: %v", err)
|
|
|
|
|
// Abort Upload when init success and upload failed
|
|
|
|
|
if uploadID != nil {
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Abort Upload Multipart, upload_id: %s, objectParts: %s", *uploadID, objectParts))
|
|
|
|
|
fs.Debugf(o, "Abort Upload Multipart, upload_id: %s, objectParts: %s", *uploadID, objectParts)
|
|
|
|
|
abortReq := qs.AbortMultipartUploadInput{
|
|
|
|
|
UploadID: uploadID,
|
|
|
|
|
}
|
|
|
|
@ -846,7 +859,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Initiate Upload Multipart, key: %s", key))
|
|
|
|
|
fs.Debugf(o, "Initiate Upload Multipart, key: %s", key)
|
|
|
|
|
mimeType := fs.MimeType(src)
|
|
|
|
|
initReq := qs.InitiateMultipartUploadInput{
|
|
|
|
|
ContentType: &mimeType,
|
|
|
|
@ -877,7 +890,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|
|
|
|
ContentLength: &size,
|
|
|
|
|
Body: buffer,
|
|
|
|
|
}
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Upload Multipart, upload_id: %s, part_number: %d", *uploadID, number))
|
|
|
|
|
fs.Debugf(o, "Upload Multipart, upload_id: %s, part_number: %d", *uploadID, number)
|
|
|
|
|
_, err = bucketInit.UploadMultipart(key, &req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
@ -891,7 +904,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Complete Multipart Upload
|
|
|
|
|
fs.Debugf(o, fmt.Sprintf("Complete Upload Multipart, upload_id: %s, objectParts: %d", *uploadID, objectParts))
|
|
|
|
|
fs.Debugf(o, "Complete Upload Multipart, upload_id: %s, objectParts: %d", *uploadID, objectParts)
|
|
|
|
|
completeReq := qs.CompleteMultipartUploadInput{
|
|
|
|
|
UploadID: uploadID,
|
|
|
|
|
ObjectParts: objectParts,
|
|
|
|
@ -923,10 +936,11 @@ func (o *Object) Fs() fs.Info {
|
|
|
|
|
return o.fs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
|
|
|
|
|
|
|
|
|
// Hash returns the selected checksum of the file
|
|
|
|
|
// If no checksum is available it returns ""
|
|
|
|
|
func (o *Object) Hash(t fs.HashType) (string, error) {
|
|
|
|
|
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
|
|
|
|
if t != fs.HashMD5 {
|
|
|
|
|
return "", fs.ErrHashUnsupported
|
|
|
|
|
}
|
|
|
|
|