You've already forked torrent-client
mirror of
https://github.com/veggiedefender/torrent-client.git
synced 2025-11-06 09:29:16 +02:00
Pipeline 5 requests at a time
This commit is contained in:
@@ -92,7 +92,6 @@ func (c *client) hasPiece(index int) bool {
|
||||
}
|
||||
|
||||
func (c *client) hasNext() bool {
|
||||
fmt.Println(c.reader.Buffered() > 0)
|
||||
return c.reader.Buffered() > 0
|
||||
}
|
||||
|
||||
|
||||
80
p2p/p2p.go
80
p2p/p2p.go
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/veggiedefender/torrent-client/message"
|
||||
)
|
||||
|
||||
const maxBlockSize = 32768
|
||||
const maxBlockSize = 16384
|
||||
|
||||
// Peer encodes connection information for a peer
|
||||
type Peer struct {
|
||||
@@ -117,32 +117,47 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) {
|
||||
buf := make([]byte, pieceLength)
|
||||
c.unchoke()
|
||||
c.interested()
|
||||
offset := 0
|
||||
for c.hasNext() {
|
||||
msg, err := c.read() // this call blocks
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg == nil { // keep-alive
|
||||
continue
|
||||
}
|
||||
fmt.Println("CATCHING UP ON", msg)
|
||||
switch msg.ID {
|
||||
case message.MsgUnchoke:
|
||||
c.choked = false
|
||||
case message.MsgChoke:
|
||||
c.choked = true
|
||||
}
|
||||
}
|
||||
|
||||
for offset < pieceLength {
|
||||
if !c.choked {
|
||||
blockSize := maxBlockSize
|
||||
if pieceLength-offset < blockSize {
|
||||
// Last block might be shorter than the typical block
|
||||
blockSize = pieceLength - offset
|
||||
downloaded := 0
|
||||
requested := 0
|
||||
for downloaded < pieceLength {
|
||||
for c.hasNext() {
|
||||
msg, err := c.read() // this call blocks
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msg == nil { // keep-alive
|
||||
continue
|
||||
}
|
||||
if msg.ID != message.MsgPiece {
|
||||
fmt.Println(msg)
|
||||
}
|
||||
switch msg.ID {
|
||||
case message.MsgUnchoke:
|
||||
c.choked = false
|
||||
case message.MsgChoke:
|
||||
c.choked = true
|
||||
case message.MsgPiece:
|
||||
// fmt.Println(" PIECE")
|
||||
n, err := message.ParsePiece(pw.index, buf, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
downloaded += n
|
||||
}
|
||||
}
|
||||
|
||||
n := 5
|
||||
if !c.choked && requested < pieceLength && requested-downloaded <= n+1 {
|
||||
for i := 0; i < n; i++ {
|
||||
blockSize := maxBlockSize
|
||||
if pieceLength-requested < blockSize {
|
||||
// Last block might be shorter than the typical block
|
||||
blockSize = pieceLength - requested
|
||||
}
|
||||
// fmt.Println("Request")
|
||||
c.request(pw.index, requested, blockSize)
|
||||
requested += blockSize
|
||||
}
|
||||
c.request(pw.index, offset, blockSize)
|
||||
}
|
||||
|
||||
msg, err := c.read() // this call blocks
|
||||
@@ -161,11 +176,12 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) {
|
||||
case message.MsgChoke:
|
||||
c.choked = true
|
||||
case message.MsgPiece:
|
||||
// fmt.Println(" PIECE")
|
||||
n, err := message.ParsePiece(pw.index, buf, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += n
|
||||
downloaded += n
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +197,9 @@ func downloadPiece(c *client, pw *pieceWork, pieceLength int) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (s *swarm) removeClient(c *client) {
|
||||
if len(s.clients) == 1 {
|
||||
panic("Removed last client")
|
||||
}
|
||||
log.Printf("Removing client. %d clients remaining\n", len(s.clients))
|
||||
s.mux.Lock()
|
||||
var i int
|
||||
@@ -198,6 +217,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) {
|
||||
s.mux.Lock()
|
||||
c, err := s.selectClient(pw.index)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
// Re-enqueue the piece to try again
|
||||
s.queue <- pw
|
||||
s.mux.Unlock()
|
||||
@@ -211,7 +231,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) {
|
||||
pieceBuf, err := downloadPiece(c, pw, pieceLength)
|
||||
if err != nil {
|
||||
// Re-enqueue the piece to try again
|
||||
log.Println(err)
|
||||
// log.Println(err)
|
||||
s.removeClient(c)
|
||||
s.queue <- pw
|
||||
} else {
|
||||
@@ -220,7 +240,7 @@ func (s *swarm) worker(d *Download, wg *sync.WaitGroup) {
|
||||
|
||||
s.mux.Lock()
|
||||
s.piecesDone++
|
||||
log.Printf("Downloaded piece %d (%d/%d) %0.2f%%\n", pw.index, s.piecesDone, len(d.PieceHashes), float64(s.piecesDone)/float64(len(d.PieceHashes)))
|
||||
log.Printf("Downloaded piece %d (%d/%d) %0.2f%%\n", pw.index, s.piecesDone, len(d.PieceHashes), float64(s.piecesDone)/float64(len(d.PieceHashes))*100)
|
||||
s.mux.Unlock()
|
||||
|
||||
if s.piecesDone == len(d.PieceHashes) {
|
||||
@@ -243,7 +263,7 @@ func (d *Download) processQueue(clients []*client, queue chan *pieceWork) []byte
|
||||
mux: sync.Mutex{},
|
||||
}
|
||||
|
||||
numWorkers := len(s.clients) / 2
|
||||
numWorkers := (len(s.clients) + 1) / 2
|
||||
log.Printf("Spawning %d workers\n", numWorkers)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(numWorkers)
|
||||
|
||||
Reference in New Issue
Block a user