1
0
mirror of https://github.com/veggiedefender/torrent-client.git synced 2025-11-06 09:29:16 +02:00

Add p2p client

This commit is contained in:
Jesse
2019-12-26 17:04:32 -05:00
parent 5201cfc0b4
commit b1fc8c7fb8
3 changed files with 138 additions and 66 deletions

79
p2p/client.go Normal file
View File

@@ -0,0 +1,79 @@
package p2p
import (
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/veggiedefender/torrent-client/message"
"github.com/veggiedefender/torrent-client/handshake"
)
type client struct {
conn net.Conn
bitfield message.Bitfield
Choked bool
Mux sync.Mutex
}
func completeHandshake(conn net.Conn, infohash, peerID [20]byte) (*handshake.Handshake, error) {
conn.SetDeadline(time.Now().Local().Add(3 * time.Second))
defer conn.SetDeadline(time.Time{}) // Disable the deadline
req := handshake.New(infohash, peerID)
_, err := conn.Write(req.Serialize())
if err != nil {
return nil, err
}
res, err := handshake.Read(conn)
if err != nil {
return nil, err
}
return res, nil
}
func recvBitfield(conn net.Conn) (message.Bitfield, error) {
conn.SetDeadline(time.Now().Local().Add(5 * time.Second))
defer conn.SetDeadline(time.Time{}) // Disable the deadline
msg, err := message.Read(conn)
if err != nil {
return nil, err
}
if msg.ID != message.MsgBitfield {
err := fmt.Errorf("Expected bitfield but got ID %d", msg.ID)
return nil, err
}
return msg.Payload, nil
}
func newClient(peer Peer, peerID, infoHash [20]byte) (*client, error) {
hostPort := net.JoinHostPort(peer.IP.String(), strconv.Itoa(int(peer.Port)))
conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second)
if err != nil {
return nil, err
}
_, err = completeHandshake(conn, infoHash, peerID)
if err != nil {
return nil, err
}
bf, err := recvBitfield(conn)
if err != nil {
return nil, err
}
return &client{
conn: conn,
bitfield: bf,
Mux: sync.Mutex{},
Choked: true,
}, nil
}
func (c *client) hasPiece(index int) bool {
return c.bitfield.HasPiece(index)
}

View File

@@ -1,11 +1,9 @@
package p2p
import (
"fmt"
"net"
"strconv"
"time"
"github.com/veggiedefender/torrent-client/handshake"
"sync"
)
// Peer encodes connection information for a peer
@@ -23,73 +21,66 @@ type Download struct {
Length int
}
type peerState struct {
peer *Peer
conn net.Conn
type pieceWork struct {
index int
hash [20]byte
}
type swarm struct {
peerStates []*peerState
}
func (p *Peer) connect(peerID [20]byte, infoHash [20]byte) (net.Conn, error) {
hostPort := net.JoinHostPort(p.IP.String(), strconv.Itoa(int(p.Port)))
conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second)
if err != nil {
return nil, err
}
return conn, nil
}
func (d *Download) handshake(conn net.Conn) (*handshake.Handshake, error) {
conn.SetDeadline(time.Now().Local().Add(3 * time.Second))
req := handshake.New(d.InfoHash, d.PeerID)
_, err := conn.Write(req.Serialize())
if err != nil {
return nil, err
}
res, err := handshake.Read(conn)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Time{}) // Disable the deadline
return res, nil
}
func (d *Download) initPeer(p *Peer, c chan *peerState) {
conn, err := p.connect(d.PeerID, d.InfoHash)
if err != nil {
c <- nil
return
}
_, err = d.handshake(conn)
if err != nil {
c <- nil
return
}
c <- &peerState{p, conn}
}
func (d *Download) startSwarm() *swarm {
c := make(chan *peerState)
for i := range d.Peers {
go d.initPeer(&d.Peers[i], c)
}
peerStates := make([]*peerState, 0)
for range d.Peers {
ps := <-c
if ps != nil {
peerStates = append(peerStates, ps)
}
}
return &swarm{peerStates}
clients []*client
queue chan *pieceWork
mux sync.Mutex
}
// Download downloads a torrent
func (d *Download) Download() error {
d.startSwarm()
clients := d.initClients()
if len(clients) == 0 {
return fmt.Errorf("Could not connect to any of %d clients", len(d.Peers))
}
queue := make(chan *pieceWork, len(d.PieceHashes))
for index, hash := range d.PieceHashes {
queue <- &pieceWork{index, hash}
}
processQueue(clients, queue)
return nil
}
func (d *Download) initClients() []*client {
// Create clients in parallel
c := make(chan *client)
for _, p := range d.Peers {
go func(p Peer) {
client, err := newClient(p, d.PeerID, d.InfoHash)
if err != nil {
c <- nil
} else {
c <- client
}
}(p)
}
clients := make([]*client, 0)
for range d.Peers {
client := <-c
if client != nil {
clients = append(clients, client)
}
}
return clients
}
func (s *swarm) selectClient(index int) *client {
return s.clients[0]
}
func processQueue(clients []*client, queue chan *pieceWork) {
s := swarm{clients, queue, sync.Mutex{}}
for pw := range s.queue {
client := s.selectClient(pw.index)
fmt.Println(client.conn.RemoteAddr())
break
}
}

View File

@@ -6,6 +6,7 @@ import (
"crypto/sha1"
"fmt"
"io"
"net"
"github.com/jackpal/bencode-go"
"github.com/veggiedefender/torrent-client/p2p"
@@ -44,7 +45,8 @@ func (t *Torrent) Download() error {
return err
}
peers, err := t.getPeers(peerID, Port)
// peers, err := t.getPeers(peerID, Port)
peers := []p2p.Peer{{IP: net.IP{127, 0, 0, 1}, Port: 51413}}
downloader := p2p.Download{
Peers: peers,
PeerID: peerID,