From 06fab656389950197c63ea885e1f8c08c3c38f9e Mon Sep 17 00:00:00 2001 From: Jesse Date: Fri, 27 Dec 2019 12:30:20 -0500 Subject: [PATCH] Pipeline 5 requests at a time --- p2p/client.go | 1 - p2p/p2p.go | 80 ++++++++++++++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/p2p/client.go b/p2p/client.go index 4a9d1b8..d66948f 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -92,7 +92,6 @@ func (c *client) hasPiece(index int) bool { } func (c *client) hasNext() bool { - fmt.Println(c.reader.Buffered() > 0) return c.reader.Buffered() > 0 } diff --git a/p2p/p2p.go b/p2p/p2p.go index f041194..89ba4db 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -11,7 +11,7 @@ import ( "github.com/veggiedefender/torrent-client/message" ) -const maxBlockSize = 32768 +const maxBlockSize = 16384 // Peer encodes connection information for a peer type Peer struct { @@ -117,32 +117,47 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) { buf := make([]byte, pieceLength) c.unchoke() c.interested() - offset := 0 - for c.hasNext() { - msg, err := c.read() // this call blocks - if err != nil { - return nil, err - } - if msg == nil { // keep-alive - continue - } - fmt.Println("CATCHING UP ON", msg) - switch msg.ID { - case message.MsgUnchoke: - c.choked = false - case message.MsgChoke: - c.choked = true - } - } - - for offset < pieceLength { - if !c.choked { - blockSize := maxBlockSize - if pieceLength-offset < blockSize { - // Last block might be shorter than the typical block - blockSize = pieceLength - offset + downloaded := 0 + requested := 0 + for downloaded < pieceLength { + for c.hasNext() { + msg, err := c.read() // this call blocks + if err != nil { + return nil, err + } + if msg == nil { // keep-alive + continue + } + if msg.ID != message.MsgPiece { + fmt.Println(msg) + } + switch msg.ID { + case message.MsgUnchoke: + c.choked = false + case message.MsgChoke: + c.choked = true + case message.MsgPiece: + // fmt.Println(" PIECE") + n, err := message.ParsePiece(pw.index, buf, msg) + if err != nil { + return nil, err + } + downloaded += n + } + } + + n := 5 + if !c.choked && requested < pieceLength && requested-downloaded <= n+1 { + for i := 0; i < n; i++ { + blockSize := maxBlockSize + if pieceLength-requested < blockSize { + // Last block might be shorter than the typical block + blockSize = pieceLength - requested + } + // fmt.Println("Request") + c.request(pw.index, requested, blockSize) + requested += blockSize } - c.request(pw.index, offset, blockSize) } msg, err := c.read() // this call blocks @@ -161,11 +176,12 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) { case message.MsgChoke: c.choked = true case message.MsgPiece: + // fmt.Println(" PIECE") n, err := message.ParsePiece(pw.index, buf, msg) if err != nil { return nil, err } - offset += n + downloaded += n } } @@ -181,6 +197,9 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) { } func (s *swarm) removeClient(c *client) { + if len(s.clients) == 1 { + panic("Removed last client") + } log.Printf("Removing client. %d clients remaining\n", len(s.clients)) s.mux.Lock() var i int @@ -198,6 +217,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) { s.mux.Lock() c, err := s.selectClient(pw.index) if err != nil { + fmt.Println(err) // Re-enqueue the piece to try again s.queue <- pw s.mux.Unlock() @@ -211,7 +231,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) { pieceBuf, err := downloadPiece(c, pw, pieceLength) if err != nil { // Re-enqueue the piece to try again - log.Println(err) + // log.Println(err) s.removeClient(c) s.queue <- pw } else { @@ -220,7 +240,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) { s.mux.Lock() s.piecesDone++ - log.Printf("Downloaded piece %d (%d/%d) %0.2f%%\n", pw.index, s.piecesDone, len(d.PieceHashes), float64(s.piecesDone)/float64(len(d.PieceHashes))) + log.Printf("Downloaded piece %d (%d/%d) %0.2f%%\n", pw.index, s.piecesDone, len(d.PieceHashes), float64(s.piecesDone)/float64(len(d.PieceHashes))*100) s.mux.Unlock() if s.piecesDone == len(d.PieceHashes) { @@ -243,7 +263,7 @@ func (d *Download) processQueue(clients []*client, queue chan *pieceWork) []byte mux: sync.Mutex{}, } - numWorkers := len(s.clients) / 2 + numWorkers := (len(s.clients) + 1) / 2 log.Printf("Spawning %d workers\n", numWorkers) wg := sync.WaitGroup{} wg.Add(numWorkers)