From 6ad4cc317cc0c3afa685f7645c84e21f1cd00a9e Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Mon, 2 Nov 2020 19:16:12 +0100 Subject: [PATCH] cloud backends: stat and other performance improvements --- dataprovider/dataprovider.go | 2 +- httpd/api_quota.go | 2 +- sftpd/sftpd_test.go | 2 +- vfs/azblobfs.go | 214 ++++++++++++++++---------------- vfs/gcsfs.go | 229 +++++++++++++++++++++++++---------- vfs/osfs.go | 60 ++++----- vfs/s3fs.go | 180 +++++++++++++++++++-------- 7 files changed, 434 insertions(+), 255 deletions(-) diff --git a/dataprovider/dataprovider.go b/dataprovider/dataprovider.go index 174610af..08c836d9 100644 --- a/dataprovider/dataprovider.go +++ b/dataprovider/dataprovider.go @@ -1974,7 +1974,7 @@ func executeAction(operation string, user User) { // after migrating database to v4 we have to update the quota for the imported folders func updateVFoldersQuotaAfterRestore(foldersToScan []string) { - fs := vfs.NewOsFs("", "", nil).(vfs.OsFs) + fs := vfs.NewOsFs("", "", nil).(*vfs.OsFs) for _, folder := range foldersToScan { providerLog(logger.LevelDebug, "starting quota scan after migration for folder %#v", folder) vfolder, err := provider.getFolderByPath(folder) diff --git a/httpd/api_quota.go b/httpd/api_quota.go index cccf42d6..f9a7f571 100644 --- a/httpd/api_quota.go +++ b/httpd/api_quota.go @@ -171,7 +171,7 @@ func doQuotaScan(user dataprovider.User) error { func doFolderQuotaScan(folder vfs.BaseVirtualFolder) error { defer common.QuotaScans.RemoveVFolderQuotaScan(folder.MappedPath) - fs := vfs.NewOsFs("", "", nil).(vfs.OsFs) + fs := vfs.NewOsFs("", "", nil).(*vfs.OsFs) numFiles, size, err := fs.GetDirSize(folder.MappedPath) if err != nil { logger.Warn(logSender, "", "error scanning folder %#v: %v", folder.MappedPath, err) diff --git a/sftpd/sftpd_test.go b/sftpd/sftpd_test.go index 6a803590..0ae96554 100644 --- a/sftpd/sftpd_test.go +++ b/sftpd/sftpd_test.go @@ -5688,7 +5688,7 @@ func TestResolveVirtualPaths(t *testing.T) { }) err := os.MkdirAll(mappedPath, os.ModePerm) assert.NoError(t, err) - osFs := vfs.NewOsFs("", user.GetHomeDir(), user.VirtualFolders).(vfs.OsFs) + osFs := vfs.NewOsFs("", user.GetHomeDir(), user.VirtualFolders).(*vfs.OsFs) b, f := osFs.GetFsPaths("/vdir/a.txt") assert.Equal(t, mappedPath, b) assert.Equal(t, filepath.Join(mappedPath, "a.txt"), f) diff --git a/vfs/azblobfs.go b/vfs/azblobfs.go index 2a5cb55e..1c25dc08 100644 --- a/vfs/azblobfs.go +++ b/vfs/azblobfs.go @@ -51,7 +51,7 @@ func init() { // NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, error) { - fs := AzureBlobFs{ + fs := &AzureBlobFs{ connectionID: connectionID, localTempDir: localTempDir, config: config, @@ -68,7 +68,10 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, } fs.config.AccountKey = accountKey } - setConfigDefaults(&fs) + fs.setConfigDefaults() + + version := version.Get() + telemetryValue := fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash) if fs.config.SASURL != "" { u, err := url.Parse(fs.config.SASURL) @@ -80,7 +83,7 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, TryTimeout: maxTryTimeout, }, Telemetry: azblob.TelemetryOptions{ - Value: "SFTPGo", + Value: telemetryValue, }, }) // Check if we have container level SAS or account level SAS @@ -123,7 +126,7 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, TryTimeout: maxTryTimeout, }, Telemetry: azblob.TelemetryOptions{ - Value: "SFTPGo", + Value: telemetryValue, }, }) serviceURL := azblob.NewServiceURL(*u, pipeline) @@ -132,24 +135,8 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, return fs, nil } -func setConfigDefaults(fs *AzureBlobFs) { - if fs.config.Endpoint == "" { - fs.config.Endpoint = azureDefaultEndpoint - } - if fs.config.UploadPartSize == 0 { - fs.config.UploadPartSize = 4 - } - fs.config.UploadPartSize *= 1024 * 1024 - if fs.config.UploadConcurrency == 0 { - fs.config.UploadConcurrency = 2 - } - if fs.config.AccessTier == "" { - fs.config.AccessTier = string(azblob.AccessTierNone) - } -} - // Name returns the name for the Fs implementation -func (fs AzureBlobFs) Name() string { +func (fs *AzureBlobFs) Name() string { if fs.config.SASURL != "" { return fmt.Sprintf("Azure Blob SAS URL %#v", fs.config.Container) } @@ -157,12 +144,12 @@ func (fs AzureBlobFs) Name() string { } // ConnectionID returns the connection ID associated to this Fs implementation -func (fs AzureBlobFs) ConnectionID() string { +func (fs *AzureBlobFs) ConnectionID() string { return fs.connectionID } // Stat returns a FileInfo describing the named file -func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) { +func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) { if name == "" || name == "." { err := fs.checkIfBucketExists() if err != nil { @@ -173,59 +160,34 @@ func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) { if fs.config.KeyPrefix == name+"/" { return NewFileInfo(name, true, 0, time.Now(), false), nil } - prefix := fs.getPrefixForStat(name) - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) - defer cancelFn() - for marker := (azblob.Marker{}); marker.NotDone(); { - listBlob, err := fs.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{ - Details: azblob.BlobListingDetails{ - Copy: false, - Metadata: false, - Snapshots: false, - UncommittedBlobs: false, - Deleted: false, - }, - Prefix: prefix, - }) - if err != nil { - metrics.AZListObjectsCompleted(err) - return nil, err - } - marker = listBlob.NextMarker - for _, blobPrefix := range listBlob.Segment.BlobPrefixes { - if fs.isEqual(blobPrefix.Name, name) { - metrics.AZListObjectsCompleted(nil) - return NewFileInfo(name, true, 0, time.Now(), false), nil - } - } - for _, blobInfo := range listBlob.Segment.BlobItems { - if fs.isEqual(blobInfo.Name, name) { - isDir := false - if blobInfo.Properties.ContentType != nil { - isDir = (*blobInfo.Properties.ContentType == dirMimeType) - } - size := int64(0) - if blobInfo.Properties.ContentLength != nil { - size = *blobInfo.Properties.ContentLength - } - metrics.AZListObjectsCompleted(nil) - return NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false), nil - } - } + attrs, err := fs.headObject(name) + if err == nil { + isDir := (attrs.ContentType() == dirMimeType) + metrics.AZListObjectsCompleted(nil) + return NewFileInfo(name, isDir, attrs.ContentLength(), attrs.LastModified(), false), nil + } + if !fs.IsNotExist(err) { + return nil, err + } + // now check if this is a prefix (virtual directory) + hasContents, err := fs.hasContents(name) + if err != nil { + return nil, err + } + if hasContents { + return NewFileInfo(name, true, 0, time.Now(), false), nil } - - metrics.AZListObjectsCompleted(nil) return nil, errors.New("404 no such file or directory") } // Lstat returns a FileInfo describing the named file -func (fs AzureBlobFs) Lstat(name string) (os.FileInfo, error) { +func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) { return fs.Stat(name) } // Open opens the named file for reading -func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { +func (fs *AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -257,7 +219,7 @@ func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeRea } // Create creates or opens the named file for writing -func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { +func (fs *AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -304,7 +266,7 @@ func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func // rename all the contents too and this could take long time: think // about directories with thousands of files, for each file we should // execute a StartCopyFromURL call. -func (fs AzureBlobFs) Rename(source, target string) error { +func (fs *AzureBlobFs) Rename(source, target string) error { if source == target { return nil } @@ -313,11 +275,11 @@ func (fs AzureBlobFs) Rename(source, target string) error { return err } if fi.IsDir() { - contents, err := fs.ReadDir(source) + hasContents, err := fs.hasContents(source) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot rename non empty directory: %#v", source) } } @@ -363,13 +325,13 @@ func (fs AzureBlobFs) Rename(source, target string) error { } // Remove removes the named file or (empty) directory. -func (fs AzureBlobFs) Remove(name string, isDir bool) error { +func (fs *AzureBlobFs) Remove(name string, isDir bool) error { if isDir { - contents, err := fs.ReadDir(name) + hasContents, err := fs.hasContents(name) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot remove non empty directory: %#v", name) } } @@ -383,7 +345,7 @@ func (fs AzureBlobFs) Remove(name string, isDir bool) error { } // Mkdir creates a new directory with the specified name and default permissions -func (fs AzureBlobFs) Mkdir(name string) error { +func (fs *AzureBlobFs) Mkdir(name string) error { _, err := fs.Stat(name) if !fs.IsNotExist(err) { return err @@ -396,43 +358,43 @@ func (fs AzureBlobFs) Mkdir(name string) error { } // Symlink creates source as a symbolic link to target. -func (AzureBlobFs) Symlink(source, target string) error { +func (*AzureBlobFs) Symlink(source, target string) error { return errors.New("403 symlinks are not supported") } // Readlink returns the destination of the named symbolic link -func (AzureBlobFs) Readlink(name string) (string, error) { +func (*AzureBlobFs) Readlink(name string) (string, error) { return "", errors.New("403 readlink is not supported") } // Chown changes the numeric uid and gid of the named file. // Silently ignored. -func (AzureBlobFs) Chown(name string, uid int, gid int) error { +func (*AzureBlobFs) Chown(name string, uid int, gid int) error { return nil } // Chmod changes the mode of the named file to mode. // Silently ignored. -func (AzureBlobFs) Chmod(name string, mode os.FileMode) error { +func (*AzureBlobFs) Chmod(name string, mode os.FileMode) error { return nil } // Chtimes changes the access and modification times of the named file. // Silently ignored. -func (AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error { +func (*AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error { return errors.New("403 chtimes is not supported") } // Truncate changes the size of the named file. // Truncate by path is not supported, while truncating an opened // file is handled inside base transfer -func (AzureBlobFs) Truncate(name string, size int64) error { +func (*AzureBlobFs) Truncate(name string, size int64) error { return errors.New("403 truncate is not supported") } // ReadDir reads the directory named by dirname and returns // a list of directory entries. -func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { +func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { var result []os.FileInfo // dirname must be already cleaned prefix := "" @@ -504,20 +466,20 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) { // IsUploadResumeSupported returns true if upload resume is supported. // Upload Resume is not supported on Azure Blob -func (AzureBlobFs) IsUploadResumeSupported() bool { +func (*AzureBlobFs) IsUploadResumeSupported() bool { return false } // IsAtomicUploadSupported returns true if atomic upload is supported. // Azure Blob uploads are already atomic, we don't need to upload to a temporary // file -func (AzureBlobFs) IsAtomicUploadSupported() bool { +func (*AzureBlobFs) IsAtomicUploadSupported() bool { return false } // IsNotExist returns a boolean indicating whether the error is known to // report that a file or directory does not exist -func (AzureBlobFs) IsNotExist(err error) bool { +func (*AzureBlobFs) IsNotExist(err error) bool { if err == nil { return false } @@ -537,7 +499,7 @@ func (AzureBlobFs) IsNotExist(err error) bool { // IsPermission returns a boolean indicating whether the error is known to // report that permission is denied. -func (AzureBlobFs) IsPermission(err error) bool { +func (*AzureBlobFs) IsPermission(err error) bool { if err == nil { return false } @@ -556,7 +518,7 @@ func (AzureBlobFs) IsPermission(err error) bool { } // CheckRootPath creates the specified local root directory if it does not exists -func (fs AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool { +func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool { // we need a local directory for temporary files osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil) return osFs.CheckRootPath(username, uid, gid) @@ -564,7 +526,7 @@ func (fs AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool { // ScanRootDirContents returns the number of files contained in the bucket, // and their size -func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) { +func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) { numFiles := 0 size := int64(0) @@ -610,19 +572,19 @@ func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) { // GetDirSize returns the number of files and the size for a folder // including any subfolders -func (AzureBlobFs) GetDirSize(dirname string) (int, int64, error) { +func (*AzureBlobFs) GetDirSize(dirname string) (int, int64, error) { return 0, 0, errUnsupported } // GetAtomicUploadPath returns the path to use for an atomic upload. // Azure Blob Storage uploads are already atomic, we never call this method -func (AzureBlobFs) GetAtomicUploadPath(name string) string { +func (*AzureBlobFs) GetAtomicUploadPath(name string) string { return "" } // GetRelativePath returns the path for a file relative to the user's home dir. // This is the path as seen by SFTPGo users -func (fs AzureBlobFs) GetRelativePath(name string) string { +func (fs *AzureBlobFs) GetRelativePath(name string) string { rel := path.Clean(name) if rel == "." { rel = "" @@ -630,7 +592,7 @@ func (fs AzureBlobFs) GetRelativePath(name string) string { if !path.IsAbs(rel) { rel = "/" + rel } - if len(fs.config.KeyPrefix) > 0 { + if fs.config.KeyPrefix != "" { if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) { rel = "/" } @@ -641,7 +603,7 @@ func (fs AzureBlobFs) GetRelativePath(name string) string { // Walk walks the file tree rooted at root, calling walkFn for each file or // directory in the tree, including root -func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { +func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { prefix := "" if root != "" && root != "." { prefix = strings.TrimPrefix(root, "/") @@ -673,15 +635,14 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { if blobInfo.Properties.ContentType != nil { isDir = (*blobInfo.Properties.ContentType == dirMimeType) } - name := path.Clean(blobInfo.Name) - if len(name) == 0 { + if fs.isEqual(blobInfo.Name, prefix) { continue } blobSize := int64(0) if blobInfo.Properties.ContentLength != nil { blobSize = *blobInfo.Properties.ContentLength } - err = walkFn(blobInfo.Name, NewFileInfo(name, isDir, blobSize, blobInfo.Properties.LastModified, false), nil) + err = walkFn(blobInfo.Name, NewFileInfo(blobInfo.Name, isDir, blobSize, blobInfo.Properties.LastModified, false), nil) if err != nil { return err } @@ -693,31 +654,36 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { } // Join joins any number of path elements into a single path -func (AzureBlobFs) Join(elem ...string) string { +func (*AzureBlobFs) Join(elem ...string) string { return strings.TrimPrefix(path.Join(elem...), "/") } // HasVirtualFolders returns true if folders are emulated -func (AzureBlobFs) HasVirtualFolders() bool { +func (*AzureBlobFs) HasVirtualFolders() bool { return true } // ResolvePath returns the matching filesystem path for the specified sftp path -func (fs AzureBlobFs) ResolvePath(virtualPath string) (string, error) { +func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) { if !path.IsAbs(virtualPath) { virtualPath = path.Clean("/" + virtualPath) } return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil } -// GetMimeType implements MimeTyper interface -func (fs AzureBlobFs) GetMimeType(name string) (string, error) { +func (fs *AzureBlobFs) headObject(name string) (*azblob.BlobGetPropertiesResponse, error) { ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() blobBlockURL := fs.containerURL.NewBlockBlobURL(name) response, err := blobBlockURL.GetProperties(ctx, azblob.BlobAccessConditions{}) metrics.AZHeadObjectCompleted(err) + return response, err +} + +// GetMimeType implements MimeTyper interface +func (fs *AzureBlobFs) GetMimeType(name string) (string, error) { + response, err := fs.headObject(name) if err != nil { return "", err } @@ -737,6 +703,22 @@ func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool { return false } +func (fs *AzureBlobFs) setConfigDefaults() { + if fs.config.Endpoint == "" { + fs.config.Endpoint = azureDefaultEndpoint + } + if fs.config.UploadPartSize == 0 { + fs.config.UploadPartSize = 4 + } + fs.config.UploadPartSize *= 1024 * 1024 + if fs.config.UploadConcurrency == 0 { + fs.config.UploadConcurrency = 2 + } + if fs.config.AccessTier == "" { + fs.config.AccessTier = string(azblob.AccessTierNone) + } +} + func (fs *AzureBlobFs) checkIfBucketExists() error { ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() @@ -746,17 +728,35 @@ func (fs *AzureBlobFs) checkIfBucketExists() error { return err } -func (fs *AzureBlobFs) getPrefixForStat(name string) string { - prefix := path.Dir(name) - if prefix == "/" || prefix == "." || prefix == "" { - prefix = "" - } else { - prefix = strings.TrimPrefix(prefix, "/") +func (fs *AzureBlobFs) hasContents(name string) (bool, error) { + result := false + prefix := "" + if name != "" && name != "." { + prefix = strings.TrimPrefix(name, "/") if !strings.HasSuffix(prefix, "/") { prefix += "/" } } - return prefix + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() + + listBlob, err := fs.containerURL.ListBlobsFlatSegment(ctx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{ + Details: azblob.BlobListingDetails{ + Copy: false, + Metadata: false, + Snapshots: false, + UncommittedBlobs: false, + Deleted: false, + }, + Prefix: prefix, + MaxResults: 1, + }) + metrics.AZListObjectsCompleted(err) + if err != nil { + return result, err + } + result = len(listBlob.Segment.BlobItems) > 0 + return result, err } func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, blockBlobURL azblob.BlockBlobURL, diff --git a/vfs/gcsfs.go b/vfs/gcsfs.go index bdf23382..9fc28340 100644 --- a/vfs/gcsfs.go +++ b/vfs/gcsfs.go @@ -27,7 +27,7 @@ import ( ) var ( - gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated"} + gcsDefaultFieldsSelection = []string{"Name", "Size", "Deleted", "Updated", "ContentType"} ) // GCSFs is a Fs implementation for Google Cloud Storage. @@ -47,7 +47,7 @@ func init() { // NewGCSFs returns an GCSFs object that allows to interact with Google Cloud Storage func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) { var err error - fs := GCSFs{ + fs := &GCSFs{ connectionID: connectionID, localTempDir: localTempDir, config: config, @@ -69,17 +69,17 @@ func NewGCSFs(connectionID, localTempDir string, config GCSFsConfig) (Fs, error) } // Name returns the name for the Fs implementation -func (fs GCSFs) Name() string { +func (fs *GCSFs) Name() string { return fmt.Sprintf("GCSFs bucket %#v", fs.config.Bucket) } // ConnectionID returns the connection ID associated to this Fs implementation -func (fs GCSFs) ConnectionID() string { +func (fs *GCSFs) ConnectionID() string { return fs.connectionID } // Stat returns a FileInfo describing the named file -func (fs GCSFs) Stat(name string) (os.FileInfo, error) { +func (fs *GCSFs) Stat(name string) (os.FileInfo, error) { var result FileInfo var err error if name == "" || name == "." { @@ -92,9 +92,32 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) { if fs.config.KeyPrefix == name+"/" { return NewFileInfo(name, true, 0, time.Now(), false), nil } + attrs, err := fs.headObject(name) + if err == nil { + objSize := attrs.Size + objectModTime := attrs.Updated + isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/") + return NewFileInfo(name, isDir, objSize, objectModTime, false), nil + } + if !fs.IsNotExist(err) { + return result, err + } + // now check if this is a prefix (virtual directory) + hasContents, err := fs.hasContents(name) + if err == nil && hasContents { + return NewFileInfo(name, true, 0, time.Now(), false), nil + } else if err != nil { + return nil, err + } + // search a dir ending with "/" for backward compatibility + return fs.getStatCompat(name) +} + +func (fs *GCSFs) getStatCompat(name string) (os.FileInfo, error) { + var result FileInfo prefix := fs.getPrefixForStat(name) query := &storage.Query{Prefix: prefix, Delimiter: "/"} - err = query.SetAttrSelection(gcsDefaultFieldsSelection) + err := query.SetAttrSelection(gcsDefaultFieldsSelection) if err != nil { return nil, err } @@ -135,12 +158,12 @@ func (fs GCSFs) Stat(name string) (os.FileInfo, error) { } // Lstat returns a FileInfo describing the named file -func (fs GCSFs) Lstat(name string) (os.FileInfo, error) { +func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) { return fs.Stat(name) } // Open opens the named file for reading -func (fs GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { +func (fs *GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -171,7 +194,7 @@ func (fs GCSFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, } // Create creates or opens the named file for writing -func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { +func (fs *GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -190,7 +213,7 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), er if contentType != "" { objectWriter.ObjectAttrs.ContentType = contentType } - if len(fs.config.StorageClass) > 0 { + if fs.config.StorageClass != "" { objectWriter.ObjectAttrs.StorageClass = fs.config.StorageClass } go func() { @@ -210,7 +233,7 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), er // rename all the contents too and this could take long time: think // about directories with thousands of files, for each file we should // execute a CopyObject call. -func (fs GCSFs) Rename(source, target string) error { +func (fs *GCSFs) Rename(source, target string) error { if source == target { return nil } @@ -219,28 +242,31 @@ func (fs GCSFs) Rename(source, target string) error { return err } if fi.IsDir() { - contents, err := fs.ReadDir(source) + hasContents, err := fs.hasContents(source) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot rename non empty directory: %#v", source) } - if !strings.HasSuffix(source, "/") { - source += "/" - } - if !strings.HasSuffix(target, "/") { - target += "/" - } } src := fs.svc.Bucket(fs.config.Bucket).Object(source) dst := fs.svc.Bucket(fs.config.Bucket).Object(target) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() copier := dst.CopierFrom(src) - if len(fs.config.StorageClass) > 0 { + if fs.config.StorageClass != "" { copier.StorageClass = fs.config.StorageClass } + var contentType string + if fi.IsDir() { + contentType = dirMimeType + } else { + contentType = mime.TypeByExtension(path.Ext(source)) + } + if contentType != "" { + copier.ContentType = contentType + } _, err = copier.Run(ctx) metrics.GCSCopyObjectCompleted(err) if err != nil { @@ -250,35 +276,35 @@ func (fs GCSFs) Rename(source, target string) error { } // Remove removes the named file or (empty) directory. -func (fs GCSFs) Remove(name string, isDir bool) error { +func (fs *GCSFs) Remove(name string, isDir bool) error { if isDir { - contents, err := fs.ReadDir(name) + hasContents, err := fs.hasContents(name) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot remove non empty directory: %#v", name) } - if !strings.HasSuffix(name, "/") { - name += "/" - } } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() + err := fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx) metrics.GCSDeleteObjectCompleted(err) + if fs.IsNotExist(err) && isDir { + name = name + "/" + err = fs.svc.Bucket(fs.config.Bucket).Object(name).Delete(ctx) + metrics.GCSDeleteObjectCompleted(err) + } return err } // Mkdir creates a new directory with the specified name and default permissions -func (fs GCSFs) Mkdir(name string) error { +func (fs *GCSFs) Mkdir(name string) error { _, err := fs.Stat(name) if !fs.IsNotExist(err) { return err } - if !strings.HasSuffix(name, "/") { - name += "/" - } _, w, _, err := fs.Create(name, -1) if err != nil { return err @@ -287,59 +313,57 @@ func (fs GCSFs) Mkdir(name string) error { } // Symlink creates source as a symbolic link to target. -func (GCSFs) Symlink(source, target string) error { +func (*GCSFs) Symlink(source, target string) error { return errors.New("403 symlinks are not supported") } // Readlink returns the destination of the named symbolic link -func (GCSFs) Readlink(name string) (string, error) { +func (*GCSFs) Readlink(name string) (string, error) { return "", errors.New("403 readlink is not supported") } // Chown changes the numeric uid and gid of the named file. // Silently ignored. -func (GCSFs) Chown(name string, uid int, gid int) error { +func (*GCSFs) Chown(name string, uid int, gid int) error { return nil } // Chmod changes the mode of the named file to mode. // Silently ignored. -func (GCSFs) Chmod(name string, mode os.FileMode) error { +func (*GCSFs) Chmod(name string, mode os.FileMode) error { return nil } // Chtimes changes the access and modification times of the named file. // Silently ignored. -func (GCSFs) Chtimes(name string, atime, mtime time.Time) error { +func (*GCSFs) Chtimes(name string, atime, mtime time.Time) error { return errors.New("403 chtimes is not supported") } // Truncate changes the size of the named file. // Truncate by path is not supported, while truncating an opened // file is handled inside base transfer -func (GCSFs) Truncate(name string, size int64) error { +func (*GCSFs) Truncate(name string, size int64) error { return errors.New("403 truncate is not supported") } // ReadDir reads the directory named by dirname and returns // a list of directory entries. -func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) { +func (fs *GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) { var result []os.FileInfo // dirname must be already cleaned - prefix := "" - if dirname != "" && dirname != "." { - prefix = strings.TrimPrefix(dirname, "/") - if !strings.HasSuffix(prefix, "/") { - prefix += "/" - } - } + prefix := fs.getPrefix(dirname) + query := &storage.Query{Prefix: prefix, Delimiter: "/"} err := query.SetAttrSelection(gcsDefaultFieldsSelection) if err != nil { return nil, err } + + prefixes := make(map[string]bool) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() + bkt := fs.svc.Bucket(fs.config.Bucket) it := bkt.Objects(ctx, query) for { @@ -353,15 +377,32 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) { } if attrs.Prefix != "" { name, _ := fs.resolve(attrs.Prefix, prefix) + if name == "" { + continue + } + if _, ok := prefixes[name]; ok { + continue + } result = append(result, NewFileInfo(name, true, 0, time.Now(), false)) + prefixes[name] = true } else { name, isDir := fs.resolve(attrs.Name, prefix) - if len(name) == 0 { + if name == "" { continue } if !attrs.Deleted.IsZero() { continue } + if attrs.ContentType == dirMimeType { + isDir = true + } + if isDir { + // check if the dir is already included, it will be sent as blob prefix if it contains at least one item + if _, ok := prefixes[name]; ok { + continue + } + prefixes[name] = true + } fi := NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false) result = append(result, fi) } @@ -372,20 +413,20 @@ func (fs GCSFs) ReadDir(dirname string) ([]os.FileInfo, error) { // IsUploadResumeSupported returns true if upload resume is supported. // SFTP Resume is not supported on S3 -func (GCSFs) IsUploadResumeSupported() bool { +func (*GCSFs) IsUploadResumeSupported() bool { return false } // IsAtomicUploadSupported returns true if atomic upload is supported. // S3 uploads are already atomic, we don't need to upload to a temporary // file -func (GCSFs) IsAtomicUploadSupported() bool { +func (*GCSFs) IsAtomicUploadSupported() bool { return false } // IsNotExist returns a boolean indicating whether the error is known to // report that a file or directory does not exist -func (GCSFs) IsNotExist(err error) bool { +func (*GCSFs) IsNotExist(err error) bool { if err == nil { return false } @@ -402,7 +443,7 @@ func (GCSFs) IsNotExist(err error) bool { // IsPermission returns a boolean indicating whether the error is known to // report that permission is denied. -func (GCSFs) IsPermission(err error) bool { +func (*GCSFs) IsPermission(err error) bool { if err == nil { return false } @@ -415,7 +456,7 @@ func (GCSFs) IsPermission(err error) bool { } // CheckRootPath creates the specified local root directory if it does not exists -func (fs GCSFs) CheckRootPath(username string, uid int, gid int) bool { +func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool { // we need a local directory for temporary files osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil) return osFs.CheckRootPath(username, uid, gid) @@ -423,7 +464,7 @@ func (fs GCSFs) CheckRootPath(username string, uid int, gid int) bool { // ScanRootDirContents returns the number of files contained in the bucket, // and their size -func (fs GCSFs) ScanRootDirContents() (int, int64, error) { +func (fs *GCSFs) ScanRootDirContents() (int, int64, error) { numFiles := 0 size := int64(0) query := &storage.Query{Prefix: fs.config.KeyPrefix} @@ -447,7 +488,7 @@ func (fs GCSFs) ScanRootDirContents() (int, int64, error) { if !attrs.Deleted.IsZero() { continue } - isDir := strings.HasSuffix(attrs.Name, "/") + isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType if isDir && attrs.Size == 0 { continue } @@ -460,19 +501,19 @@ func (fs GCSFs) ScanRootDirContents() (int, int64, error) { // GetDirSize returns the number of files and the size for a folder // including any subfolders -func (GCSFs) GetDirSize(dirname string) (int, int64, error) { +func (*GCSFs) GetDirSize(dirname string) (int, int64, error) { return 0, 0, errUnsupported } // GetAtomicUploadPath returns the path to use for an atomic upload. // GCS uploads are already atomic, we never call this method for GCS -func (GCSFs) GetAtomicUploadPath(name string) string { +func (*GCSFs) GetAtomicUploadPath(name string) string { return "" } // GetRelativePath returns the path for a file relative to the user's home dir. // This is the path as seen by SFTPGo users -func (fs GCSFs) GetRelativePath(name string) string { +func (fs *GCSFs) GetRelativePath(name string) string { rel := path.Clean(name) if rel == "." { rel = "" @@ -480,7 +521,7 @@ func (fs GCSFs) GetRelativePath(name string) string { if !path.IsAbs(rel) { rel = "/" + rel } - if len(fs.config.KeyPrefix) > 0 { + if fs.config.KeyPrefix != "" { if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) { rel = "/" } @@ -491,7 +532,7 @@ func (fs GCSFs) GetRelativePath(name string) string { // Walk walks the file tree rooted at root, calling walkFn for each file or // directory in the tree, including root -func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { +func (fs *GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { prefix := "" if root != "" && root != "." { prefix = strings.TrimPrefix(root, "/") @@ -524,11 +565,13 @@ func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { if !attrs.Deleted.IsZero() { continue } - isDir := strings.HasSuffix(attrs.Name, "/") - name := path.Clean(attrs.Name) - if len(name) == 0 { + name, isDir := fs.resolve(attrs.Name, prefix) + if name == "" { continue } + if attrs.ContentType == dirMimeType { + isDir = true + } err = walkFn(attrs.Name, NewFileInfo(name, isDir, attrs.Size, attrs.Updated, false), nil) if err != nil { return err @@ -541,7 +584,7 @@ func (fs GCSFs) Walk(root string, walkFn filepath.WalkFunc) error { } // Join joins any number of path elements into a single path -func (GCSFs) Join(elem ...string) string { +func (*GCSFs) Join(elem ...string) string { return strings.TrimPrefix(path.Join(elem...), "/") } @@ -551,7 +594,7 @@ func (GCSFs) HasVirtualFolders() bool { } // ResolvePath returns the matching filesystem path for the specified virtual path -func (fs GCSFs) ResolvePath(virtualPath string) (string, error) { +func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) { if !path.IsAbs(virtualPath) { virtualPath = path.Clean("/" + virtualPath) } @@ -589,6 +632,59 @@ func (fs *GCSFs) checkIfBucketExists() error { return err } +func (fs *GCSFs) hasContents(name string) (bool, error) { + result := false + prefix := "" + if name != "" && name != "." { + prefix = strings.TrimPrefix(name, "/") + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + } + query := &storage.Query{Prefix: prefix} + err := query.SetAttrSelection(gcsDefaultFieldsSelection) + if err != nil { + return result, err + } + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) + defer cancelFn() + bkt := fs.svc.Bucket(fs.config.Bucket) + it := bkt.Objects(ctx, query) + // if we have a dir object with a trailing slash it will be returned so we set the size to 2 + it.PageInfo().MaxSize = 2 + for { + attrs, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + metrics.GCSListObjectsCompleted(err) + return result, err + } + name, _ := fs.resolve(attrs.Name, prefix) + // a dir object with a trailing slash will result in an empty name + if name == "/" || name == "" { + continue + } + result = true + break + } + + metrics.GCSListObjectsCompleted(err) + return result, nil +} + +func (fs *GCSFs) getPrefix(name string) string { + prefix := "" + if name != "" && name != "." && name != "/" { + prefix = strings.TrimPrefix(name, "/") + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + } + return prefix +} + func (fs *GCSFs) getPrefixForStat(name string) string { prefix := path.Dir(name) if prefix == "/" || prefix == "." || prefix == "" { @@ -602,8 +698,7 @@ func (fs *GCSFs) getPrefixForStat(name string) string { return prefix } -// GetMimeType implements MimeTyper interface -func (fs GCSFs) GetMimeType(name string) (string, error) { +func (fs *GCSFs) headObject(name string) (*storage.ObjectAttrs, error) { ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() @@ -611,6 +706,12 @@ func (fs GCSFs) GetMimeType(name string) (string, error) { obj := bkt.Object(name) attrs, err := obj.Attrs(ctx) metrics.GCSHeadObjectCompleted(err) + return attrs, err +} + +// GetMimeType implements MimeTyper interface +func (fs *GCSFs) GetMimeType(name string) (string, error) { + attrs, err := fs.headObject(name) if err != nil { return "", err } diff --git a/vfs/osfs.go b/vfs/osfs.go index 8fa82b71..314e234f 100644 --- a/vfs/osfs.go +++ b/vfs/osfs.go @@ -30,7 +30,7 @@ type OsFs struct { // NewOsFs returns an OsFs object that allows to interact with local Os filesystem func NewOsFs(connectionID, rootDir string, virtualFolders []VirtualFolder) Fs { - return OsFs{ + return &OsFs{ name: osFsName, connectionID: connectionID, rootDir: rootDir, @@ -39,17 +39,17 @@ func NewOsFs(connectionID, rootDir string, virtualFolders []VirtualFolder) Fs { } // Name returns the name for the Fs implementation -func (fs OsFs) Name() string { +func (fs *OsFs) Name() string { return fs.name } // ConnectionID returns the SSH connection ID associated to this Fs implementation -func (fs OsFs) ConnectionID() string { +func (fs *OsFs) ConnectionID() string { return fs.connectionID } // Stat returns a FileInfo describing the named file -func (fs OsFs) Stat(name string) (os.FileInfo, error) { +func (fs *OsFs) Stat(name string) (os.FileInfo, error) { fi, err := os.Stat(name) if err != nil { return fi, err @@ -64,7 +64,7 @@ func (fs OsFs) Stat(name string) (os.FileInfo, error) { } // Lstat returns a FileInfo describing the named file -func (fs OsFs) Lstat(name string) (os.FileInfo, error) { +func (fs *OsFs) Lstat(name string) (os.FileInfo, error) { fi, err := os.Lstat(name) if err != nil { return fi, err @@ -79,13 +79,13 @@ func (fs OsFs) Lstat(name string) (os.FileInfo, error) { } // Open opens the named file for reading -func (OsFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { +func (*OsFs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { f, err := os.Open(name) return f, nil, nil, err } // Create creates or opens the named file for writing -func (OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { +func (*OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { var err error var f *os.File if flag == 0 { @@ -97,28 +97,28 @@ func (OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) } // Rename renames (moves) source to target -func (OsFs) Rename(source, target string) error { +func (*OsFs) Rename(source, target string) error { return os.Rename(source, target) } // Remove removes the named file or (empty) directory. -func (OsFs) Remove(name string, isDir bool) error { +func (*OsFs) Remove(name string, isDir bool) error { return os.Remove(name) } // Mkdir creates a new directory with the specified name and default permissions -func (OsFs) Mkdir(name string) error { +func (*OsFs) Mkdir(name string) error { return os.Mkdir(name, os.ModePerm) } // Symlink creates source as a symbolic link to target. -func (OsFs) Symlink(source, target string) error { +func (*OsFs) Symlink(source, target string) error { return os.Symlink(source, target) } // Readlink returns the destination of the named symbolic link // as absolute virtual path -func (fs OsFs) Readlink(name string) (string, error) { +func (fs *OsFs) Readlink(name string) (string, error) { p, err := os.Readlink(name) if err != nil { return p, err @@ -127,28 +127,28 @@ func (fs OsFs) Readlink(name string) (string, error) { } // Chown changes the numeric uid and gid of the named file. -func (OsFs) Chown(name string, uid int, gid int) error { +func (*OsFs) Chown(name string, uid int, gid int) error { return os.Chown(name, uid, gid) } // Chmod changes the mode of the named file to mode -func (OsFs) Chmod(name string, mode os.FileMode) error { +func (*OsFs) Chmod(name string, mode os.FileMode) error { return os.Chmod(name, mode) } // Chtimes changes the access and modification times of the named file -func (OsFs) Chtimes(name string, atime, mtime time.Time) error { +func (*OsFs) Chtimes(name string, atime, mtime time.Time) error { return os.Chtimes(name, atime, mtime) } // Truncate changes the size of the named file -func (OsFs) Truncate(name string, size int64) error { +func (*OsFs) Truncate(name string, size int64) error { return os.Truncate(name, size) } // ReadDir reads the directory named by dirname and returns // a list of directory entries. -func (OsFs) ReadDir(dirname string) ([]os.FileInfo, error) { +func (*OsFs) ReadDir(dirname string) ([]os.FileInfo, error) { f, err := os.Open(dirname) if err != nil { return nil, err @@ -162,29 +162,29 @@ func (OsFs) ReadDir(dirname string) ([]os.FileInfo, error) { } // IsUploadResumeSupported returns true if upload resume is supported -func (OsFs) IsUploadResumeSupported() bool { +func (*OsFs) IsUploadResumeSupported() bool { return true } // IsAtomicUploadSupported returns true if atomic upload is supported -func (OsFs) IsAtomicUploadSupported() bool { +func (*OsFs) IsAtomicUploadSupported() bool { return true } // IsNotExist returns a boolean indicating whether the error is known to // report that a file or directory does not exist -func (OsFs) IsNotExist(err error) bool { +func (*OsFs) IsNotExist(err error) bool { return os.IsNotExist(err) } // IsPermission returns a boolean indicating whether the error is known to // report that permission is denied. -func (OsFs) IsPermission(err error) bool { +func (*OsFs) IsPermission(err error) bool { return os.IsPermission(err) } // CheckRootPath creates the root directory if it does not exists -func (fs OsFs) CheckRootPath(username string, uid int, gid int) bool { +func (fs *OsFs) CheckRootPath(username string, uid int, gid int) bool { var err error if _, err = fs.Stat(fs.rootDir); fs.IsNotExist(err) { err = os.MkdirAll(fs.rootDir, os.ModePerm) @@ -207,7 +207,7 @@ func (fs OsFs) CheckRootPath(username string, uid int, gid int) bool { // ScanRootDirContents returns the number of files contained in a directory and // their size -func (fs OsFs) ScanRootDirContents() (int, int64, error) { +func (fs *OsFs) ScanRootDirContents() (int, int64, error) { numFiles, size, err := fs.GetDirSize(fs.rootDir) for _, v := range fs.virtualFolders { if !v.IsIncludedInUserQuota() { @@ -228,7 +228,7 @@ func (fs OsFs) ScanRootDirContents() (int, int64, error) { } // GetAtomicUploadPath returns the path to use for an atomic upload -func (OsFs) GetAtomicUploadPath(name string) string { +func (*OsFs) GetAtomicUploadPath(name string) string { dir := filepath.Dir(name) guid := xid.New().String() return filepath.Join(dir, ".sftpgo-upload."+guid+"."+filepath.Base(name)) @@ -236,7 +236,7 @@ func (OsFs) GetAtomicUploadPath(name string) string { // GetRelativePath returns the path for a file relative to the user's home dir. // This is the path as seen by SFTP users -func (fs OsFs) GetRelativePath(name string) string { +func (fs *OsFs) GetRelativePath(name string) string { basePath := fs.rootDir virtualPath := "/" for _, v := range fs.virtualFolders { @@ -258,17 +258,17 @@ func (fs OsFs) GetRelativePath(name string) string { // Walk walks the file tree rooted at root, calling walkFn for each file or // directory in the tree, including root -func (OsFs) Walk(root string, walkFn filepath.WalkFunc) error { +func (*OsFs) Walk(root string, walkFn filepath.WalkFunc) error { return filepath.Walk(root, walkFn) } // Join joins any number of path elements into a single path -func (OsFs) Join(elem ...string) string { +func (*OsFs) Join(elem ...string) string { return filepath.Join(elem...) } // ResolvePath returns the matching filesystem path for the specified sftp path -func (fs OsFs) ResolvePath(sftpPath string) (string, error) { +func (fs *OsFs) ResolvePath(sftpPath string) (string, error) { if !filepath.IsAbs(fs.rootDir) { return "", fmt.Errorf("Invalid root path: %v", fs.rootDir) } @@ -295,7 +295,7 @@ func (fs OsFs) ResolvePath(sftpPath string) (string, error) { // GetDirSize returns the number of files and the size for a folder // including any subfolders -func (fs OsFs) GetDirSize(dirname string) (int, int64, error) { +func (fs *OsFs) GetDirSize(dirname string) (int, int64, error) { numFiles := 0 size := int64(0) isDir, err := IsDirectory(fs, dirname) @@ -315,7 +315,7 @@ func (fs OsFs) GetDirSize(dirname string) (int, int64, error) { } // HasVirtualFolders returns true if folders are emulated -func (OsFs) HasVirtualFolders() bool { +func (*OsFs) HasVirtualFolders() bool { return false } diff --git a/vfs/s3fs.go b/vfs/s3fs.go index 0ade641c..9a40912a 100644 --- a/vfs/s3fs.go +++ b/vfs/s3fs.go @@ -44,7 +44,7 @@ func init() { // NewS3Fs returns an S3Fs object that allows to interact with an s3 compatible // object storage func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) { - fs := S3Fs{ + fs := &S3Fs{ connectionID: connectionID, localTempDir: localTempDir, config: config, @@ -96,17 +96,17 @@ func NewS3Fs(connectionID, localTempDir string, config S3FsConfig) (Fs, error) { } // Name returns the name for the Fs implementation -func (fs S3Fs) Name() string { +func (fs *S3Fs) Name() string { return fmt.Sprintf("S3Fs bucket %#v", fs.config.Bucket) } // ConnectionID returns the connection ID associated to this Fs implementation -func (fs S3Fs) ConnectionID() string { +func (fs *S3Fs) ConnectionID() string { return fs.connectionID } // Stat returns a FileInfo describing the named file -func (fs S3Fs) Stat(name string) (os.FileInfo, error) { +func (fs *S3Fs) Stat(name string) (os.FileInfo, error) { var result FileInfo if name == "/" || name == "." { err := fs.checkIfBucketExists() @@ -118,6 +118,30 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) { if "/"+fs.config.KeyPrefix == name+"/" { return NewFileInfo(name, true, 0, time.Now(), false), nil } + obj, err := fs.headObject(name) + if err == nil { + objSize := *obj.ContentLength + objectModTime := *obj.LastModified + return NewFileInfo(name, false, objSize, objectModTime, false), nil + } + if !fs.IsNotExist(err) { + return result, err + } + // now check if this is a prefix (virtual directory) + hasContents, err := fs.hasContents(name) + if err == nil && hasContents { + return NewFileInfo(name, true, 0, time.Now(), false), nil + } else if err != nil { + return nil, err + } + // the requested file could still be a directory as a zero bytes key + // with a forwarding slash. + // As last resort we do a list dir to find it + return fs.getStatCompat(name) +} + +func (fs *S3Fs) getStatCompat(name string) (os.FileInfo, error) { + var result FileInfo prefix := path.Dir(name) if prefix == "/" || prefix == "." { prefix = "" @@ -129,6 +153,7 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) { } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() + err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{ Bucket: aws.String(fs.config.Bucket), Prefix: aws.String(prefix), @@ -144,7 +169,7 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) { if fs.isEqual(fileObject.Key, name) { objectSize := *fileObject.Size objectModTime := *fileObject.LastModified - isDir := strings.HasSuffix(*fileObject.Key, "/") + isDir := strings.HasSuffix(*fileObject.Key, "/") && objectSize == 0 result = NewFileInfo(name, isDir, objectSize, objectModTime, false) return false } @@ -159,12 +184,12 @@ func (fs S3Fs) Stat(name string) (os.FileInfo, error) { } // Lstat returns a FileInfo describing the named file -func (fs S3Fs) Lstat(name string) (os.FileInfo, error) { +func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) { return fs.Stat(name) } // Open opens the named file for reading -func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { +func (fs *S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -191,7 +216,7 @@ func (fs S3Fs) Open(name string, offset int64) (*os.File, *pipeat.PipeReaderAt, } // Create creates or opens the named file for writing -func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { +func (fs *S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) { r, w, err := pipeat.PipeInDir(fs.localTempDir) if err != nil { return nil, nil, nil, err @@ -209,11 +234,11 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), err contentType = mime.TypeByExtension(path.Ext(name)) } response, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{ - Bucket: aws.String(fs.config.Bucket), - Key: aws.String(key), - Body: r, - StorageClass: utils.NilIfEmpty(fs.config.StorageClass), - ContentEncoding: utils.NilIfEmpty(contentType), + Bucket: aws.String(fs.config.Bucket), + Key: aws.String(key), + Body: r, + StorageClass: utils.NilIfEmpty(fs.config.StorageClass), + ContentType: utils.NilIfEmpty(contentType), }, func(u *s3manager.Uploader) { u.Concurrency = fs.config.UploadConcurrency u.PartSize = fs.config.UploadPartSize @@ -237,7 +262,7 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), err // // https://github.com/aws/aws-sdk-go/pull/2653 // -func (fs S3Fs) Rename(source, target string) error { +func (fs *S3Fs) Rename(source, target string) error { if source == target { return nil } @@ -247,11 +272,11 @@ func (fs S3Fs) Rename(source, target string) error { } copySource := fs.Join(fs.config.Bucket, source) if fi.IsDir() { - contents, err := fs.ReadDir(source) + hasContents, err := fs.hasContents(source) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot rename non empty directory: %#v", source) } if !strings.HasSuffix(copySource, "/") { @@ -261,12 +286,20 @@ func (fs S3Fs) Rename(source, target string) error { target += "/" } } + var contentType string + if fi.IsDir() { + contentType = dirMimeType + } else { + contentType = mime.TypeByExtension(path.Ext(source)) + } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() _, err = fs.svc.CopyObjectWithContext(ctx, &s3.CopyObjectInput{ - Bucket: aws.String(fs.config.Bucket), - CopySource: aws.String(copySource), - Key: aws.String(target), + Bucket: aws.String(fs.config.Bucket), + CopySource: aws.String(copySource), + Key: aws.String(target), + StorageClass: utils.NilIfEmpty(fs.config.StorageClass), + ContentType: utils.NilIfEmpty(contentType), }) metrics.S3CopyObjectCompleted(err) if err != nil { @@ -276,13 +309,13 @@ func (fs S3Fs) Rename(source, target string) error { } // Remove removes the named file or (empty) directory. -func (fs S3Fs) Remove(name string, isDir bool) error { +func (fs *S3Fs) Remove(name string, isDir bool) error { if isDir { - contents, err := fs.ReadDir(name) + hasContents, err := fs.hasContents(name) if err != nil { return err } - if len(contents) > 0 { + if hasContents { return fmt.Errorf("Cannot remove non empty directory: %#v", name) } if !strings.HasSuffix(name, "/") { @@ -300,7 +333,7 @@ func (fs S3Fs) Remove(name string, isDir bool) error { } // Mkdir creates a new directory with the specified name and default permissions -func (fs S3Fs) Mkdir(name string) error { +func (fs *S3Fs) Mkdir(name string) error { _, err := fs.Stat(name) if !fs.IsNotExist(err) { return err @@ -316,43 +349,43 @@ func (fs S3Fs) Mkdir(name string) error { } // Symlink creates source as a symbolic link to target. -func (S3Fs) Symlink(source, target string) error { +func (*S3Fs) Symlink(source, target string) error { return errors.New("403 symlinks are not supported") } // Readlink returns the destination of the named symbolic link -func (S3Fs) Readlink(name string) (string, error) { +func (*S3Fs) Readlink(name string) (string, error) { return "", errors.New("403 readlink is not supported") } // Chown changes the numeric uid and gid of the named file. // Silently ignored. -func (S3Fs) Chown(name string, uid int, gid int) error { +func (*S3Fs) Chown(name string, uid int, gid int) error { return nil } // Chmod changes the mode of the named file to mode. // Silently ignored. -func (S3Fs) Chmod(name string, mode os.FileMode) error { +func (*S3Fs) Chmod(name string, mode os.FileMode) error { return nil } // Chtimes changes the access and modification times of the named file. // Silently ignored. -func (S3Fs) Chtimes(name string, atime, mtime time.Time) error { +func (*S3Fs) Chtimes(name string, atime, mtime time.Time) error { return errors.New("403 chtimes is not supported") } // Truncate changes the size of the named file. // Truncate by path is not supported, while truncating an opened // file is handled inside base transfer -func (S3Fs) Truncate(name string, size int64) error { +func (*S3Fs) Truncate(name string, size int64) error { return errors.New("403 truncate is not supported") } // ReadDir reads the directory named by dirname and returns // a list of directory entries. -func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { +func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { var result []os.FileInfo // dirname must be already cleaned prefix := "" @@ -362,6 +395,9 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { prefix += "/" } } + + prefixes := make(map[string]bool) + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() err := fs.svc.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{ @@ -370,17 +406,31 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { Delimiter: aws.String("/"), }, func(page *s3.ListObjectsV2Output, lastPage bool) bool { for _, p := range page.CommonPrefixes { - name, isDir := fs.resolve(p.Prefix, prefix) - result = append(result, NewFileInfo(name, isDir, 0, time.Now(), false)) + // prefixes have a trailing slash + name, _ := fs.resolve(p.Prefix, prefix) + if name == "" { + continue + } + if _, ok := prefixes[name]; ok { + continue + } + result = append(result, NewFileInfo(name, true, 0, time.Now(), false)) + prefixes[name] = true } for _, fileObject := range page.Contents { objectSize := *fileObject.Size objectModTime := *fileObject.LastModified name, isDir := fs.resolve(fileObject.Key, prefix) - if len(name) == 0 { + if name == "" { continue } - result = append(result, NewFileInfo(name, isDir, objectSize, objectModTime, false)) + if isDir { + if _, ok := prefixes[name]; ok { + continue + } + prefixes[name] = true + } + result = append(result, NewFileInfo(name, (isDir && objectSize == 0), objectSize, objectModTime, false)) } return true }) @@ -390,20 +440,20 @@ func (fs S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) { // IsUploadResumeSupported returns true if upload resume is supported. // SFTP Resume is not supported on S3 -func (S3Fs) IsUploadResumeSupported() bool { +func (*S3Fs) IsUploadResumeSupported() bool { return false } // IsAtomicUploadSupported returns true if atomic upload is supported. // S3 uploads are already atomic, we don't need to upload to a temporary // file -func (S3Fs) IsAtomicUploadSupported() bool { +func (*S3Fs) IsAtomicUploadSupported() bool { return false } // IsNotExist returns a boolean indicating whether the error is known to // report that a file or directory does not exist -func (S3Fs) IsNotExist(err error) bool { +func (*S3Fs) IsNotExist(err error) bool { if err == nil { return false } @@ -428,7 +478,7 @@ func (S3Fs) IsNotExist(err error) bool { // IsPermission returns a boolean indicating whether the error is known to // report that permission is denied. -func (S3Fs) IsPermission(err error) bool { +func (*S3Fs) IsPermission(err error) bool { if err == nil { return false } @@ -436,7 +486,7 @@ func (S3Fs) IsPermission(err error) bool { } // CheckRootPath creates the specified local root directory if it does not exists -func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool { +func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool { // we need a local directory for temporary files osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil) return osFs.CheckRootPath(username, uid, gid) @@ -444,7 +494,7 @@ func (fs S3Fs) CheckRootPath(username string, uid int, gid int) bool { // ScanRootDirContents returns the number of files contained in the bucket, // and their size -func (fs S3Fs) ScanRootDirContents() (int, int64, error) { +func (fs *S3Fs) ScanRootDirContents() (int, int64, error) { numFiles := 0 size := int64(0) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) @@ -469,19 +519,19 @@ func (fs S3Fs) ScanRootDirContents() (int, int64, error) { // GetDirSize returns the number of files and the size for a folder // including any subfolders -func (S3Fs) GetDirSize(dirname string) (int, int64, error) { +func (*S3Fs) GetDirSize(dirname string) (int, int64, error) { return 0, 0, errUnsupported } // GetAtomicUploadPath returns the path to use for an atomic upload. // S3 uploads are already atomic, we never call this method for S3 -func (S3Fs) GetAtomicUploadPath(name string) string { +func (*S3Fs) GetAtomicUploadPath(name string) string { return "" } // GetRelativePath returns the path for a file relative to the user's home dir. // This is the path as seen by SFTPGo users -func (fs S3Fs) GetRelativePath(name string) string { +func (fs *S3Fs) GetRelativePath(name string) string { rel := path.Clean(name) if rel == "." { rel = "" @@ -489,7 +539,7 @@ func (fs S3Fs) GetRelativePath(name string) string { if !strings.HasPrefix(rel, "/") { return "/" + rel } - if len(fs.config.KeyPrefix) > 0 { + if fs.config.KeyPrefix != "" { if !strings.HasPrefix(rel, "/"+fs.config.KeyPrefix) { rel = "/" } @@ -500,7 +550,7 @@ func (fs S3Fs) GetRelativePath(name string) string { // Walk walks the file tree rooted at root, calling walkFn for each file or // directory in the tree, including root. The result are unordered -func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error { +func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error { prefix := "" if root != "/" && root != "." { prefix = strings.TrimPrefix(root, "/") @@ -519,7 +569,7 @@ func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error { objectModTime := *fileObject.LastModified isDir := strings.HasSuffix(*fileObject.Key, "/") name := path.Clean(*fileObject.Key) - if len(name) == 0 { + if name == "/" || name == "." { continue } err := walkFn(fs.Join("/", *fileObject.Key), NewFileInfo(name, isDir, objectSize, objectModTime, false), nil) @@ -536,17 +586,17 @@ func (fs S3Fs) Walk(root string, walkFn filepath.WalkFunc) error { } // Join joins any number of path elements into a single path -func (S3Fs) Join(elem ...string) string { +func (*S3Fs) Join(elem ...string) string { return path.Join(elem...) } // HasVirtualFolders returns true if folders are emulated -func (S3Fs) HasVirtualFolders() bool { +func (*S3Fs) HasVirtualFolders() bool { return true } // ResolvePath returns the matching filesystem path for the specified virtual path -func (fs S3Fs) ResolvePath(virtualPath string) (string, error) { +func (fs *S3Fs) ResolvePath(virtualPath string) (string, error) { if !path.IsAbs(virtualPath) { virtualPath = path.Clean("/" + virtualPath) } @@ -590,8 +640,30 @@ func (fs *S3Fs) checkIfBucketExists() error { return err } -// GetMimeType implements MimeTyper interface -func (fs S3Fs) GetMimeType(name string) (string, error) { +func (fs *S3Fs) hasContents(name string) (bool, error) { + prefix := "" + if name != "/" && name != "." { + prefix = strings.TrimPrefix(name, "/") + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + } + maxResults := int64(1) + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) + defer cancelFn() + results, err := fs.svc.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(fs.config.Bucket), + Prefix: aws.String(prefix), + MaxKeys: &maxResults, + }) + metrics.S3ListObjectsCompleted(err) + if err != nil { + return false, err + } + return len(results.Contents) > 0, nil +} + +func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) { ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) defer cancelFn() obj, err := fs.svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ @@ -599,6 +671,12 @@ func (fs S3Fs) GetMimeType(name string) (string, error) { Key: aws.String(name), }) metrics.S3HeadObjectCompleted(err) + return obj, err +} + +// GetMimeType implements MimeTyper interface +func (fs *S3Fs) GetMimeType(name string) (string, error) { + obj, err := fs.headObject(name) if err != nil { return "", err }