From 4a0a42c2f111443891fcf19f65f8dbc80aa45fb6 Mon Sep 17 00:00:00 2001
From: Nick Craig-Wood <nick@craig-wood.com>
Date: Sat, 26 Sep 2015 12:51:05 +0100
Subject: [PATCH] swift: large file upload fixes

  * Read metadata in file listing for 0 length files to fix syncs
  * Ignore non-existent files in isManifestFile to fix errors on copy
  * remove nsToSwiftFloatString - experiments with the swift program
    indicate that it puts a variable number of points after the
    decimal, so might as well use the one in the swift library.
  * Make sure segments get deleted properly when move from segmented
    to non segmented and vice versa
  * Use internal list routine to detect errors on listing
  * Remove the _segments container if possible
  * Remove manifest first when deleting
---
 swift/swift.go | 211 ++++++++++++++++++++++++++++++-------------------
 1 file changed, 130 insertions(+), 81 deletions(-)

diff --git a/swift/swift.go b/swift/swift.go
index f052fa2ad..a6d94f820 100644
--- a/swift/swift.go
+++ b/swift/swift.go
@@ -66,10 +66,11 @@ func init() {
 
 // FsSwift represents a remote swift server
 type FsSwift struct {
-	name      string           // name of this remote
-	c         swift.Connection // the connection to the swift server
-	container string           // the container we are working on
-	root      string           // the path we are working on if any
+	name              string           // name of this remote
+	c                 swift.Connection // the connection to the swift server
+	container         string           // the container we are working on
+	segmentsContainer string           // container to store the segments (if any) in
+	root              string           // the path we are working on if any
 }
 
 // FsObjectSwift describes a swift object
@@ -163,10 +164,11 @@ func NewFs(name, root string) (fs.Fs, error) {
 		return nil, err
 	}
 	f := &FsSwift{
-		name:      name,
-		c:         *c,
-		container: container,
-		root:      directory,
+		name:              name,
+		c:                 *c,
+		container:         container,
+		segmentsContainer: container + "_segments",
+		root:              directory,
 	}
 	if f.root != "" {
 		f.root += "/"
@@ -216,21 +218,25 @@ func (f *FsSwift) NewFsObject(remote string) fs.Object {
 	return f.newFsObjectWithInfo(remote, nil)
 }
 
-// list the objects into the function supplied
+// listFn is called from list and listContainerRoot to handle an object
+type listFn func(string, *swift.Object) error
+
+// listContainerRoot lists the objects into the function supplied from
+// the container and root supplied
 //
 // If directories is set it only sends directories
-func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) {
+func (f *FsSwift) listContainerRoot(container, root string, directories bool, fn listFn) error {
 	// Options for ObjectsWalk
 	opts := swift.ObjectsOpts{
-		Prefix: f.root,
+		Prefix: root,
 		Limit:  256,
 	}
 	if directories {
 		opts.Delimiter = '/'
 	}
-	rootLength := len(f.root)
-	err := f.c.ObjectsWalk(f.container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
-		objects, err := f.c.Objects(f.container, opts)
+	rootLength := len(root)
+	return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
+		objects, err := f.c.Objects(container, opts)
 		if err == nil {
 			for i := range objects {
 				object := &objects[i]
@@ -241,16 +247,26 @@ func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) {
 					}
 					object.Name = object.Name[:len(object.Name)-1]
 				}
-				if !strings.HasPrefix(object.Name, f.root) {
+				if !strings.HasPrefix(object.Name, root) {
 					fs.Log(f, "Odd name received %q", object.Name)
 					continue
 				}
 				remote := object.Name[rootLength:]
-				fn(remote, object)
+				err = fn(remote, object)
+				if err != nil {
+					break
+				}
 			}
 		}
 		return objects, err
 	})
+}
+
+// list the objects into the function supplied
+//
+// If directories is set it only sends directories
+func (f *FsSwift) list(directories bool, fn listFn) {
+	err := f.listContainerRoot(f.container, f.root, directories, fn)
 	if err != nil {
 		fs.Stats.Error()
 		fs.ErrorLog(f, "Couldn't read container %q: %s", f.container, err)
@@ -269,10 +285,18 @@ func (f *FsSwift) List() fs.ObjectsChan {
 		// List the objects
 		go func() {
 			defer close(out)
-			f.list(false, func(remote string, object *swift.Object) {
-				if fs := f.newFsObjectWithInfo(remote, object); fs != nil {
-					out <- fs
+			f.list(false, func(remote string, object *swift.Object) error {
+				if o := f.newFsObjectWithInfo(remote, object); o != nil {
+					// Do full metadata read on 0 size objects which might be manifest files
+					if o.Size() == 0 {
+						err := o.(*FsObjectSwift).readMetaData()
+						if err != nil {
+							fs.Debug(o, "Failed to read metadata: %v", err)
+						}
+					}
+					out <- o
 				}
+				return nil
 			})
 		}()
 	}
@@ -304,12 +328,13 @@ func (f *FsSwift) ListDir() fs.DirChan {
 		// List the directories in the path in the container
 		go func() {
 			defer close(out)
-			f.list(true, func(remote string, object *swift.Object) {
+			f.list(true, func(remote string, object *swift.Object) error {
 				out <- &fs.Dir{
 					Name:  remote,
 					Bytes: object.Bytes,
 					Count: 0,
 				}
+				return nil
 			})
 		}()
 	}
@@ -394,7 +419,7 @@ func (o *FsObjectSwift) Md5sum() (string, error) {
 		return "", err
 	}
 	if isManifest {
-		fs.Debug(o, "Return empty md5 for swift manifest file. Md5 of manifest file calculate as md5 of md5 of it's parts, so it's not original md5")
+		fs.Debug(o, "Returning empty Md5sum for swift manifest file")
 		return "", nil
 	}
 	return strings.ToLower(o.info.Hash), nil
@@ -404,6 +429,9 @@ func (o *FsObjectSwift) Md5sum() (string, error) {
 func (o *FsObjectSwift) isManifestFile() (bool, error) {
 	err := o.readMetaData()
 	if err != nil {
+		if err == swift.ObjectNotFound {
+			return false, nil
+		}
 		return false, err
 	}
 	_, isManifestFile := (*o.headers)["X-Object-Manifest"]
@@ -491,76 +519,100 @@ func min(x, y int64) int64 {
 	return y
 }
 
-// nsToSwiftFloatString turns a number of ns into a floating point
-// string in seconds the same way as the "swift" tool
-func nsToSwiftFloatString(ns int64) string {
-	if ns < 0 {
-		return "-" + nsToSwiftFloatString(-ns)
+// removeSegments removes any old segments from o
+//
+// if except is passed in then segments with that prefix won't be deleted
+func (o *FsObjectSwift) removeSegments(except string) error {
+	segmentsRoot := o.swift.root + o.remote + "/"
+	err := o.swift.listContainerRoot(o.swift.segmentsContainer, segmentsRoot, false, func(remote string, object *swift.Object) error {
+		if except != "" && strings.HasPrefix(remote, except) {
+			// fs.Debug(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.swift.segmentsContainer)
+			return nil
+		}
+		segmentPath := segmentsRoot + remote
+		fs.Debug(o, "Removing segment file %q in container %q", segmentPath, o.swift.segmentsContainer)
+		return o.swift.c.ObjectDelete(o.swift.segmentsContainer, segmentPath)
+	})
+	if err != nil {
+		return err
 	}
-	result := fmt.Sprintf("%010d", ns)
-	split := len(result) - 9
-	result, decimals := result[:split], result[split:split+2]
-	if decimals != "" {
-		result += "."
-		result += decimals
+	// remove the segments container if empty, ignore errors
+	err = o.swift.c.ContainerDelete(o.swift.segmentsContainer)
+	if err == nil {
+		fs.Debug(o, "Removed empty container %q", o.swift.segmentsContainer)
 	}
-	return result
+	return nil
+}
+
+// updateChunks updates the existing object using chunks to a separate
+// container.  It returns a string which prefixes current segments.
+func (o *FsObjectSwift) updateChunks(in io.Reader, headers swift.Headers, size int64) (string, error) {
+	// Create the segmentsContainer if it doesn't exist
+	err := o.swift.c.ContainerCreate(o.swift.segmentsContainer, nil)
+	if err != nil {
+		return "", err
+	}
+	// Upload the chunks
+	left := size
+	i := 0
+	uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
+	segmentsPath := fmt.Sprintf("%s%s/%s", o.swift.root, o.remote, uniquePrefix)
+	for left > 0 {
+		n := min(left, int64(chunkSize))
+		segmentReader := io.LimitReader(in, n)
+		segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
+		fs.Debug(o, "Uploading segment file %q into %q", segmentPath, o.swift.segmentsContainer)
+		_, err := o.swift.c.ObjectPut(o.swift.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
+		if err != nil {
+			return "", err
+		}
+		left -= n
+		i++
+	}
+	// Upload the manifest
+	headers["X-Object-Manifest"] = fmt.Sprintf("%s/%s", o.swift.segmentsContainer, segmentsPath)
+	emptyReader := bytes.NewReader(nil)
+	manifestName := o.swift.root + o.remote
+	_, err = o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", headers)
+	return uniquePrefix + "/", err
 }
 
 // Update the object with the contents of the io.Reader, modTime and size
 //
 // The new object may have been created if an error is returned
 func (o *FsObjectSwift) Update(in io.Reader, modTime time.Time, size int64) error {
+	// Note whether this has a manifest before starting
+	isManifest, err := o.isManifestFile()
+	if err != nil {
+		return err
+	}
+
 	// Set the mtime
 	m := swift.Metadata{}
 	m.SetModTime(modTime)
+	headers := m.ObjectHeaders()
+	uniquePrefix := ""
 	if size > int64(chunkSize) {
-		segmentsContainerName := o.swift.container + "_segments"
-		left := size
-		i := 0
-		nowFloat := nsToSwiftFloatString(time.Now().UnixNano())
-		for left > 0 {
-			n := min(left, int64(chunkSize))
-			segmentReader := io.LimitReader(in, n)
-			segmentPath := fmt.Sprintf("%s%s/%s/%d/%08d", o.swift.root, o.remote, nowFloat, size, i)
-			_, err := o.swift.c.ObjectPut(segmentsContainerName, segmentPath, segmentReader, true, "", "", m.ObjectHeaders())
-			if err != nil {
-				return err
-			}
-			left -= n
-			i++
-		}
-		manifestHeaders := swift.Headers{"X-Object-Manifest": fmt.Sprintf("%s/%s%s/%s/%d", segmentsContainerName, o.swift.root, o.remote, nowFloat, size)}
-		for k, v := range m.ObjectHeaders() {
-			manifestHeaders[k] = v
-		}
-		emptyReader := bytes.NewReader(nil)
-		manifestName := o.swift.root + o.remote
-		_, err := o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", manifestHeaders)
+		uniquePrefix, err = o.updateChunks(in, headers, size)
 		if err != nil {
 			return err
 		}
-		// remove old segments
-		segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote)
-		segmentsFs, err := NewFs(o.swift.name, segmentsPath)
-		if err != nil {
-			return err
-		}
-		for o := range segmentsFs.List() {
-			if !strings.HasPrefix(o.Remote(), nowFloat) {
-				fs.Log(o, "Remove old file segment '%s'", o.Remote())
-				err := o.Remove()
-				if err != nil {
-					return err
-				}
-			}
-		}
 	} else {
+		headers["X-Object-Manifest"] = "" // remove manifest
 		_, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders())
 		if err != nil {
 			return err
 		}
 	}
+
+	// If file was a manifest then remove old/all segments
+	if isManifest {
+		err = o.removeSegments(uniquePrefix)
+		if err != nil {
+			fs.Log(o, "Failed to remove old segments - carrying on with upload: %v", err)
+		}
+	}
+
 	// Read the metadata from the newly created object
 	o.headers = nil // wipe old metadata
 	return o.readMetaData()
@@ -572,22 +624,19 @@ func (o *FsObjectSwift) Remove() error {
 	if err != nil {
 		return err
 	}
+	// Remove file/manifest first
+	err = o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote)
+	if err != nil {
+		return err
+	}
+	// ...then segments if required
 	if isManifestFile {
-		// remove segments
-		segmentsContainerName := o.swift.container + "_segments"
-		segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote)
-		segmentsFs, err := NewFs(o.swift.name, segmentsPath)
+		err = o.removeSegments("")
 		if err != nil {
 			return err
 		}
-		for o := range segmentsFs.List() {
-			err := o.Remove()
-			if err != nil {
-				return err
-			}
-		}
 	}
-	return o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote)
+	return nil
 }
 
 // Check the interfaces are satisfied