From 1b5b36523b358bc03f0655bcd902486113d26463 Mon Sep 17 00:00:00 2001
From: Nick Craig-Wood <nick@craig-wood.com>
Date: Thu, 24 Aug 2023 17:16:01 +0100
Subject: [PATCH] operations: fix accounting for multi-thread transfers

This stops the accounting moving in large chunks and gets it to move
as the data is read out of the buffer.
---
 fs/operations/multithread.go | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go
index a780bfd15..a2f73d7a4 100644
--- a/fs/operations/multithread.go
+++ b/fs/operations/multithread.go
@@ -91,16 +91,19 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
 	var rs io.ReadSeeker
 	if mc.noSeek {
 		// Read directly if we are sure we aren't going to seek
-		rs = readers.NoSeeker{Reader: rc}
+		// and account with accounting
+		rs = readers.NoSeeker{Reader: mc.acc.WrapStream(rc)}
 	} else {
 		// Read the chunk into buffered reader
-		wr := multipart.NewRW()
-		defer fs.CheckClose(wr, &err)
-		_, err = io.CopyN(wr, rc, size)
+		rw := multipart.NewRW()
+		defer fs.CheckClose(rw, &err)
+		_, err = io.CopyN(rw, rc, size)
 		if err != nil {
 			return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err)
 		}
-		rs = wr
+		// Account as we go
+		rw.SetAccounting(mc.acc.AccountRead)
+		rs = rw
 	}
 
 	// Write the chunk
@@ -109,13 +112,6 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
 		return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err)
 	}
 
-	// FIXME: Wrap ReadSeeker for Accounting
-	// However, to ensure reporting is correctly seeks have to be handled properly
-	errAccRead := mc.acc.AccountRead(int(bytesWritten))
-	if errAccRead != nil {
-		return errAccRead
-	}
-
 	fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.numChunks, start, end, fs.SizeSuffix(bytesWritten))
 	return nil
 }