From 930ff266f2ea3ef2824860099c681c4e4351a4f9 Mon Sep 17 00:00:00 2001 From: Stefan Breunig Date: Mon, 1 May 2017 14:38:41 +0200 Subject: [PATCH] compare checksums on upload/download via FUSE --- cmd/mount/mount.go | 6 ++-- cmd/mount/read.go | 68 +++++++++++++++++++++++++++++++----------- cmd/mount/read_test.go | 39 ++++++++++++++++++++++++ cmd/mount/write.go | 32 +++++++++++++++++++- 4 files changed, 122 insertions(+), 23 deletions(-) diff --git a/cmd/mount/mount.go b/cmd/mount/mount.go index be3984341..d21a177b6 100644 --- a/cmd/mount/mount.go +++ b/cmd/mount/mount.go @@ -22,6 +22,7 @@ import ( // Globals var ( noModTime = false + noChecksum = false debugFUSE = false noSeek = false dirCacheTime = 5 * 60 * time.Second @@ -47,6 +48,7 @@ func init() { unix.Umask(umask) // set it back to what it was cmd.Root.AddCommand(commandDefintion) commandDefintion.Flags().BoolVarP(&noModTime, "no-modtime", "", noModTime, "Don't read/write the modification time (can speed things up).") + commandDefintion.Flags().BoolVarP(&noChecksum, "no-checksum", "", noChecksum, "Don't compare checksums on up/download.") commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.") commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.") commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.") @@ -126,10 +128,6 @@ files to be visible in the mount. * those which need to know the size in advance won't - eg B2 * maybe should pass in size as -1 to mean work it out * Or put in an an upload cache to cache the files on disk first - -### TODO ### - - * Check hashes on upload/download `, Run: func(command *cobra.Command, args []string) { cmd.CheckArgs(2, 2, command, args) diff --git a/cmd/mount/read.go b/cmd/mount/read.go index a468fde3a..2fcb7ea41 100644 --- a/cmd/mount/read.go +++ b/cmd/mount/read.go @@ -9,6 +9,7 @@ import ( "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "github.com/ncw/rclone/fs" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -20,6 +21,7 @@ type ReadFileHandle struct { o fs.Object readCalled bool // set if read has been called offset int64 + hash *fs.MultiHasher } func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) { @@ -27,9 +29,19 @@ func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) { if err != nil { return nil, err } + + var hash *fs.MultiHasher + if !noChecksum { + hash, err = fs.NewMultiHasherTypes(o.Fs().Hashes()) + if err != nil { + fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err) + } + } + fh := &ReadFileHandle{ - o: o, - r: fs.NewAccount(r, o).WithBuffer(), // account the transfer + o: o, + r: fs.NewAccount(r, o).WithBuffer(), // account the transfer + hash: hash, } fs.Stats.Transferring(fh.o.Remote()) return fh, nil @@ -48,6 +60,7 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil) // Must be called with fh.mu held func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { fh.r.StopBuffering() // stop the background reading first + fh.hash = nil oldReader := fh.r.GetReader() r := oldReader // Can we seek it directly? @@ -141,6 +154,14 @@ func (fh *ReadFileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp resp.Data = buf[:n] fh.offset = newOffset fs.Debugf(fh.o, "ReadFileHandle.Read OK") + + if fh.hash != nil { + _, err = fh.hash.Write(resp.Data) + if err != nil { + fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err) + return err + } + } } return err } @@ -155,12 +176,35 @@ func (fh *ReadFileHandle) close() error { } fh.closed = true fs.Stats.DoneTransferring(fh.o.Remote(), true) + + if err := fh.checkHash(); err != nil { + return err + } + return fh.r.Close() } // Check interface satisfied var _ fusefs.HandleFlusher = (*ReadFileHandle)(nil) +func (fh *ReadFileHandle) checkHash() error { + if fh.hash == nil || !fh.readCalled || fh.offset < fh.o.Size() { + return nil + } + + for hashType, dstSum := range fh.hash.Sums() { + srcSum, err := fh.o.Hash(hashType) + if err != nil { + return err + } + if !fs.HashEquals(dstSum, srcSum) { + return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum) + } + } + + return nil +} + // Flush is called each time the file or directory is closed. // Because there can be multiple file descriptors referring to a // single opened file, Flush can be called multiple times. @@ -169,23 +213,11 @@ func (fh *ReadFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) err defer fh.mu.Unlock() fs.Debugf(fh.o, "ReadFileHandle.Flush") - // Ignore the Flush as there is nothing we can sensibly do and - // it seems quite common for Flush to be called from - // different threads each of which have read some data. - if false { - // If Read hasn't been called then ignore the Flush - Release - // will pick it up - if !fh.readCalled { - fs.Debugf(fh.o, "ReadFileHandle.Flush ignoring flush on unread handle") - return nil - - } - err := fh.close() - if err != nil { - fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) - return err - } + if err := fh.checkHash(); err != nil { + fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) + return err } + fs.Debugf(fh.o, "ReadFileHandle.Flush OK") return nil } diff --git a/cmd/mount/read_test.go b/cmd/mount/read_test.go index f200aef89..6757e53c2 100644 --- a/cmd/mount/read_test.go +++ b/cmd/mount/read_test.go @@ -37,6 +37,45 @@ func TestReadByByte(t *testing.T) { run.rm(t, "testfile") } +func TestReadChecksum(t *testing.T) { + run.skipIfNoFUSE(t) + + // create file big enough so we exceed any single FUSE read + // request + b := make([]rune, 3*128*1024) + for i := range b { + b[i] = 'r' + } + run.createFile(t, "bigfile", string(b)) + + // The hash comparison would fail in Flush, if we did not + // ensure we read the whole file + fd, err := os.Open(run.path("bigfile")) + assert.NoError(t, err) + buf := make([]byte, 10) + _, err = io.ReadFull(fd, buf) + assert.NoError(t, err) + err = fd.Close() + assert.NoError(t, err) + + // The hash comparison would fail, because we only read parts + // of the file + fd, err = os.Open(run.path("bigfile")) + assert.NoError(t, err) + // read at start + _, err = io.ReadFull(fd, buf) + assert.NoError(t, err) + // read at end + _, err = fd.Seek(int64(len(b)-len(buf)), 0) + assert.NoError(t, err) + _, err = io.ReadFull(fd, buf) + // ensure we don't compare hashes + err = fd.Close() + assert.NoError(t, err) + + run.rm(t, "bigfile") +} + // Test double close func TestReadFileDoubleClose(t *testing.T) { run.skipIfNoFUSE(t) diff --git a/cmd/mount/write.go b/cmd/mount/write.go index 6ab7263a6..a285edc07 100644 --- a/cmd/mount/write.go +++ b/cmd/mount/write.go @@ -3,13 +3,13 @@ package mount import ( - "errors" "io" "sync" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "github.com/ncw/rclone/fs" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -26,17 +26,29 @@ type WriteFileHandle struct { result chan error file *File writeCalled bool // set the first time Write() is called + hash *fs.MultiHasher } // Check interface satisfied var _ fusefs.Handle = (*WriteFileHandle)(nil) func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { + var hash *fs.MultiHasher + if !noChecksum { + var err error + hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes()) + if err != nil { + fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err) + } + } + fh := &WriteFileHandle{ remote: src.Remote(), result: make(chan error, 1), file: f, + hash: hash, } + fh.pipeReader, fh.pipeWriter = io.Pipe() r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer go func() { @@ -71,6 +83,13 @@ func (fh *WriteFileHandle) Write(ctx context.Context, req *fuse.WriteRequest, re return err } fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) + if fh.hash != nil { + _, err = fh.hash.Write(req.Data) + if err != nil { + fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err) + return err + } + } return nil } @@ -95,6 +114,17 @@ func (fh *WriteFileHandle) close() error { if err == nil { err = readCloseErr } + if err == nil && fh.hash != nil { + for hashType, srcSum := range fh.hash.Sums() { + dstSum, err := fh.o.Hash(hashType) + if err != nil { + return err + } + if !fs.HashEquals(srcSum, dstSum) { + return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum) + } + } + } return err }