1
0
mirror of https://github.com/veggiedefender/torrent-client.git synced 2025-11-06 09:29:16 +02:00
Files
torrent-client/p2p/p2p.go
2020-01-04 11:01:01 -05:00

218 lines
5.1 KiB
Go

package p2p
import (
"bytes"
"crypto/sha1"
"fmt"
"log"
"runtime"
"time"
"github.com/veggiedefender/torrent-client/client"
"github.com/veggiedefender/torrent-client/message"
"github.com/veggiedefender/torrent-client/peers"
)
// MaxBlockSize is the largest number of bytes a request can ask for
const MaxBlockSize = 16384
// MaxBacklog is the number of unfulfilled requests a client can have in its pipeline
const MaxBacklog = 5
// Torrent holds data required to download a torrent from a list of peers
type Torrent struct {
Peers []peers.Peer
PeerID [20]byte
InfoHash [20]byte
PieceHashes [][20]byte
PieceLength int
Length int
Name string
}
type pieceWork struct {
index int
hash [20]byte
length int
}
type pieceResult struct {
index int
buf []byte
}
type pieceProgress struct {
index int
client *client.Client
buf []byte
downloaded int
requested int
backlog int
}
func (state *pieceProgress) readMessage() error {
msg, err := state.client.Read() // this call blocks
if err != nil {
return err
}
if msg == nil { // keep-alive
return nil
}
switch msg.ID {
case message.MsgUnchoke:
state.client.Choked = false
case message.MsgChoke:
state.client.Choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
if err != nil {
return err
}
state.client.Bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
if err != nil {
return err
}
state.downloaded += n
state.backlog--
}
return nil
}
func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}
// Setting a deadline helps get unresponsive peers unstuck.
// 30 seconds is more than enough time to download a 262 KB piece
c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline
for state.downloaded < pw.length {
// If unchoked, send requests until we have enough unfulfilled requests
if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// Last block might be shorter than the typical block
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}
err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil {
return nil, err
}
state.backlog++
state.requested += blockSize
}
}
err := state.readMessage()
if err != nil {
return nil, err
}
}
return state.buf, nil
}
func checkIntegrity(pw *pieceWork, buf []byte) error {
hash := sha1.Sum(buf)
if !bytes.Equal(hash[:], pw.hash[:]) {
return fmt.Errorf("Index %d failed integrity check", pw.index)
}
return nil
}
func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
return
}
defer c.Conn.Close()
log.Printf("Completed handshake with %s\n", peer.IP)
c.SendUnchoke()
c.SendInterested()
for pw := range workQueue {
if !c.Bitfield.HasPiece(pw.index) {
workQueue <- pw // Put piece back on the queue
continue
}
// Download the piece
buf, err := attemptDownloadPiece(c, pw)
if err != nil {
log.Println("Exiting", err)
workQueue <- pw // Put piece back on the queue
return
}
err = checkIntegrity(pw, buf)
if err != nil {
log.Printf("Piece #%d failed integrity check\n", pw.index)
workQueue <- pw // Put piece back on the queue
continue
}
c.SendHave(pw.index)
results <- &pieceResult{pw.index, buf}
}
}
func (t *Torrent) calculateBoundsForPiece(index int) (begin int, end int) {
begin = index * t.PieceLength
end = begin + t.PieceLength
if end > t.Length {
end = t.Length
}
return begin, end
}
func (t *Torrent) calculatePieceSize(index int) int {
begin, end := t.calculateBoundsForPiece(index)
return end - begin
}
// Download downloads the torrent. This stores the entire file in memory.
func (t *Torrent) Download() ([]byte, error) {
log.Println("Starting download for", t.Name)
// Init queues for workers to retrieve work and send results
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult)
for index, hash := range t.PieceHashes {
length := t.calculatePieceSize(index)
workQueue <- &pieceWork{index, hash, length}
}
// Start workers
for _, peer := range t.Peers {
go t.startDownloadWorker(peer, workQueue, results)
}
// Collect results into a buffer until full
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
res := <-results
begin, end := t.calculateBoundsForPiece(res.index)
copy(buf[begin:end], res.buf)
donePieces++
percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100
numWorkers := runtime.NumGoroutine() - 1 // subtract 1 for main thread
log.Printf("(%0.2f%%) Downloaded piece #%d from %d peers\n", percent, res.index, numWorkers)
}
close(workQueue)
return buf, nil
}