1
0
mirror of https://github.com/veggiedefender/torrent-client.git synced 2025-11-06 17:39:54 +02:00
Files
torrent-client/p2p/p2p.go

227 lines
5.1 KiB
Go
Raw Normal View History

2019-12-22 17:43:39 -05:00
package p2p
import (
2019-12-26 21:53:11 -05:00
"bytes"
"crypto/sha1"
2019-12-26 17:04:32 -05:00
"fmt"
2019-12-26 21:53:11 -05:00
"log"
2019-12-22 17:43:39 -05:00
"net"
2019-12-29 14:02:50 -05:00
"runtime"
"time"
2019-12-26 21:53:11 -05:00
"github.com/veggiedefender/torrent-client/message"
2019-12-22 17:43:39 -05:00
)
// MaxBlockSize is the largest number of bytes a request can ask for
const MaxBlockSize = 16384
// MaxBacklog is the number of unfulfilled requests a client can have in its pipeline
const MaxBacklog = 5
2019-12-26 21:53:11 -05:00
2019-12-22 22:25:57 -05:00
// Peer encodes connection information for a peer
2019-12-22 17:43:39 -05:00
type Peer struct {
IP net.IP
Port uint16
}
2019-12-29 14:02:50 -05:00
// Torrent holds data required to download a torrent from a list of peers
type Torrent struct {
2019-12-22 23:51:31 -05:00
Peers []Peer
PeerID [20]byte
InfoHash [20]byte
PieceHashes [][20]byte
Length int
Name string
2019-12-22 22:25:57 -05:00
}
2019-12-22 17:43:39 -05:00
2019-12-26 17:04:32 -05:00
type pieceWork struct {
2019-12-29 14:02:50 -05:00
index int
hash [20]byte
length int
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
type pieceResult struct {
index int
buf []byte
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:53:31 -05:00
type downloadState struct {
index int
client *client
buf []byte
downloaded int
requested int
2019-12-29 14:53:31 -05:00
backlog int
}
func readMessage(state *downloadState) error {
msg, err := state.client.read() // this call blocks
if err != nil {
return err
}
if msg == nil { // keep-alive
return nil
}
switch msg.ID {
case message.MsgUnchoke:
state.client.choked = false
case message.MsgChoke:
state.client.choked = true
case message.MsgHave:
index, err := message.ParseHave(msg)
if err != nil {
return err
}
state.client.bitfield.SetPiece(index)
case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg)
if err != nil {
return err
}
state.downloaded += n
state.backlog--
2019-12-26 21:53:11 -05:00
}
return nil
}
func readMessages(state *downloadState) error {
err := readMessage(state)
if err != nil {
return err
}
for state.client.hasNext() {
err := readMessage(state)
if err != nil {
return err
}
}
return nil
}
2019-12-29 14:02:50 -05:00
func attemptDownloadPiece(c *client, pw *pieceWork) ([]byte, error) {
2019-12-29 14:53:31 -05:00
state := downloadState{
index: pw.index,
client: c,
buf: make([]byte, pw.length),
}
// Setting a deadline helps get unresponsive peers unstuck.
// 30 seconds is more than enough time to download a 262 KB piece
c.conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.conn.SetDeadline(time.Time{}) // Disable the deadline
2019-12-29 14:53:31 -05:00
for state.downloaded < pw.length {
2019-12-30 10:19:09 -05:00
// If unchoked, send requests until we have enough unfulfilled requests
if !state.client.choked {
for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize
// Last block might be shorter than the typical block
if pw.length-state.requested < blockSize {
blockSize = pw.length - state.requested
}
c.request(pw.index, state.requested, blockSize)
state.backlog++
state.requested += blockSize
2019-12-26 21:53:11 -05:00
}
}
2019-12-30 10:19:09 -05:00
// Wait until we receive at least one message, and consume them
err := readMessages(&state)
2019-12-26 21:53:11 -05:00
if err != nil {
return nil, err
}
}
2019-12-29 14:53:31 -05:00
return state.buf, nil
}
func checkIntegrity(pw *pieceWork, buf []byte) error {
hash := sha1.Sum(buf)
if !bytes.Equal(hash[:], pw.hash[:]) {
return fmt.Errorf("Index %d failed integrity check", pw.index)
}
return nil
2019-12-29 14:02:50 -05:00
}
2019-12-26 21:53:11 -05:00
func (t *Torrent) startDownloadWorker(peer Peer, workQueue chan *pieceWork, results chan *pieceResult) {
2019-12-29 14:02:50 -05:00
c, err := newClient(peer, t.PeerID, t.InfoHash)
2019-12-26 21:53:11 -05:00
if err != nil {
2019-12-29 14:53:31 -05:00
log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
2019-12-29 14:02:50 -05:00
return
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
defer c.conn.Close()
2019-12-29 14:53:31 -05:00
log.Printf("Completed handshake with %s\n", peer.IP)
2019-12-26 21:53:11 -05:00
2019-12-29 14:02:50 -05:00
c.unchoke()
c.interested()
2019-12-26 21:53:11 -05:00
2019-12-29 14:02:50 -05:00
for pw := range workQueue {
if !c.hasPiece(pw.index) {
workQueue <- pw // Put piece back on the queue
continue
2019-12-26 21:53:11 -05:00
}
2019-12-24 13:23:46 -05:00
2019-12-29 14:02:50 -05:00
// Download the piece
buf, err := attemptDownloadPiece(c, pw)
2019-12-26 21:53:11 -05:00
if err != nil {
2019-12-29 14:02:50 -05:00
log.Println("Exiting", err)
workQueue <- pw // Put piece back on the queue
return
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
err = checkIntegrity(pw, buf)
2019-12-26 21:53:11 -05:00
if err != nil {
2019-12-29 14:02:50 -05:00
log.Printf("Piece #%d failed integrity check\n", pw.index)
workQueue <- pw // Put piece back on the queue
continue
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
results <- &pieceResult{pw.index, buf}
c.have(pw.index)
2019-12-26 17:04:32 -05:00
}
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
func calculateBoundsForPiece(index, numPieces, length int) (begin int, end int) {
pieceLength := length / numPieces
begin = index * pieceLength
end = begin + pieceLength
return begin, end
}
// Download downloads the torrent
func (t *Torrent) Download() ([]byte, error) {
log.Println("Starting download for", t.Name)
2019-12-29 14:02:50 -05:00
// Init queues for workers to retrieve work and send results
workQueue := make(chan *pieceWork, len(t.PieceHashes))
results := make(chan *pieceResult, len(t.PieceHashes))
for index, hash := range t.PieceHashes {
length := t.Length / len(t.PieceHashes)
workQueue <- &pieceWork{index, hash, length}
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
// Start workers
for _, peer := range t.Peers {
go t.startDownloadWorker(peer, workQueue, results)
2019-12-29 14:02:50 -05:00
}
2019-12-26 21:53:11 -05:00
2019-12-29 14:02:50 -05:00
// Collect results into a buffer until full
buf := make([]byte, t.Length)
donePieces := 0
for donePieces < len(t.PieceHashes) {
res := <-results
begin, end := calculateBoundsForPiece(res.index, len(t.PieceHashes), t.Length)
copy(buf[begin:end], res.buf)
donePieces++
percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100
2019-12-29 15:15:53 -05:00
numWorkers := runtime.NumGoroutine() - 1 // subtract 1 for main thread
log.Printf("(%0.2f%%) Downloaded piece #%d from %d peers\n", percent, res.index, numWorkers)
2019-12-26 21:53:11 -05:00
}
2019-12-29 14:02:50 -05:00
close(workQueue)
2019-12-26 21:53:11 -05:00
2019-12-29 14:02:50 -05:00
return buf, nil
2019-12-24 13:23:46 -05:00
}