From b1fc8c7fb80cbc87f4e4c458b04be70029703038 Mon Sep 17 00:00:00 2001 From: Jesse Date: Thu, 26 Dec 2019 17:04:32 -0500 Subject: [PATCH] Add p2p client --- p2p/client.go | 79 +++++++++++++++++++++++++++++ p2p/p2p.go | 121 +++++++++++++++++++++------------------------ torrent/torrent.go | 4 +- 3 files changed, 138 insertions(+), 66 deletions(-) create mode 100644 p2p/client.go diff --git a/p2p/client.go b/p2p/client.go new file mode 100644 index 0000000..4a8cd74 --- /dev/null +++ b/p2p/client.go @@ -0,0 +1,79 @@ +package p2p + +import ( + "fmt" + "net" + "strconv" + "sync" + "time" + + "github.com/veggiedefender/torrent-client/message" + + "github.com/veggiedefender/torrent-client/handshake" +) + +type client struct { + conn net.Conn + bitfield message.Bitfield + Choked bool + Mux sync.Mutex +} + +func completeHandshake(conn net.Conn, infohash, peerID [20]byte) (*handshake.Handshake, error) { + conn.SetDeadline(time.Now().Local().Add(3 * time.Second)) + defer conn.SetDeadline(time.Time{}) // Disable the deadline + + req := handshake.New(infohash, peerID) + _, err := conn.Write(req.Serialize()) + if err != nil { + return nil, err + } + + res, err := handshake.Read(conn) + if err != nil { + return nil, err + } + return res, nil +} + +func recvBitfield(conn net.Conn) (message.Bitfield, error) { + conn.SetDeadline(time.Now().Local().Add(5 * time.Second)) + defer conn.SetDeadline(time.Time{}) // Disable the deadline + + msg, err := message.Read(conn) + if err != nil { + return nil, err + } + if msg.ID != message.MsgBitfield { + err := fmt.Errorf("Expected bitfield but got ID %d", msg.ID) + return nil, err + } + + return msg.Payload, nil +} + +func newClient(peer Peer, peerID, infoHash [20]byte) (*client, error) { + hostPort := net.JoinHostPort(peer.IP.String(), strconv.Itoa(int(peer.Port))) + conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second) + if err != nil { + return nil, err + } + _, err = completeHandshake(conn, infoHash, peerID) + if err != nil { + return nil, err + } + bf, err := recvBitfield(conn) + if err != nil { + return nil, err + } + return &client{ + conn: conn, + bitfield: bf, + Mux: sync.Mutex{}, + Choked: true, + }, nil +} + +func (c *client) hasPiece(index int) bool { + return c.bitfield.HasPiece(index) +} diff --git a/p2p/p2p.go b/p2p/p2p.go index 210829c..e1da35e 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -1,11 +1,9 @@ package p2p import ( + "fmt" "net" - "strconv" - "time" - - "github.com/veggiedefender/torrent-client/handshake" + "sync" ) // Peer encodes connection information for a peer @@ -23,73 +21,66 @@ type Download struct { Length int } -type peerState struct { - peer *Peer - conn net.Conn +type pieceWork struct { + index int + hash [20]byte } type swarm struct { - peerStates []*peerState -} - -func (p *Peer) connect(peerID [20]byte, infoHash [20]byte) (net.Conn, error) { - hostPort := net.JoinHostPort(p.IP.String(), strconv.Itoa(int(p.Port))) - conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second) - if err != nil { - return nil, err - } - return conn, nil -} - -func (d *Download) handshake(conn net.Conn) (*handshake.Handshake, error) { - conn.SetDeadline(time.Now().Local().Add(3 * time.Second)) - req := handshake.New(d.InfoHash, d.PeerID) - _, err := conn.Write(req.Serialize()) - if err != nil { - return nil, err - } - - res, err := handshake.Read(conn) - if err != nil { - return nil, err - } - conn.SetDeadline(time.Time{}) // Disable the deadline - return res, nil -} - -func (d *Download) initPeer(p *Peer, c chan *peerState) { - conn, err := p.connect(d.PeerID, d.InfoHash) - if err != nil { - c <- nil - return - } - _, err = d.handshake(conn) - if err != nil { - c <- nil - return - } - c <- &peerState{p, conn} -} - -func (d *Download) startSwarm() *swarm { - c := make(chan *peerState) - for i := range d.Peers { - go d.initPeer(&d.Peers[i], c) - } - - peerStates := make([]*peerState, 0) - for range d.Peers { - ps := <-c - if ps != nil { - peerStates = append(peerStates, ps) - } - } - - return &swarm{peerStates} + clients []*client + queue chan *pieceWork + mux sync.Mutex } // Download downloads a torrent func (d *Download) Download() error { - d.startSwarm() + clients := d.initClients() + if len(clients) == 0 { + return fmt.Errorf("Could not connect to any of %d clients", len(d.Peers)) + } + + queue := make(chan *pieceWork, len(d.PieceHashes)) + for index, hash := range d.PieceHashes { + queue <- &pieceWork{index, hash} + } + processQueue(clients, queue) + return nil } + +func (d *Download) initClients() []*client { + // Create clients in parallel + c := make(chan *client) + for _, p := range d.Peers { + go func(p Peer) { + client, err := newClient(p, d.PeerID, d.InfoHash) + if err != nil { + c <- nil + } else { + c <- client + } + }(p) + } + + clients := make([]*client, 0) + for range d.Peers { + client := <-c + if client != nil { + clients = append(clients, client) + } + } + return clients +} + +func (s *swarm) selectClient(index int) *client { + return s.clients[0] +} + +func processQueue(clients []*client, queue chan *pieceWork) { + s := swarm{clients, queue, sync.Mutex{}} + for pw := range s.queue { + client := s.selectClient(pw.index) + fmt.Println(client.conn.RemoteAddr()) + break + } +} diff --git a/torrent/torrent.go b/torrent/torrent.go index 3997b22..12ed745 100644 --- a/torrent/torrent.go +++ b/torrent/torrent.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "fmt" "io" + "net" "github.com/jackpal/bencode-go" "github.com/veggiedefender/torrent-client/p2p" @@ -44,7 +45,8 @@ func (t *Torrent) Download() error { return err } - peers, err := t.getPeers(peerID, Port) + // peers, err := t.getPeers(peerID, Port) + peers := []p2p.Peer{{IP: net.IP{127, 0, 0, 1}, Port: 51413}} downloader := p2p.Download{ Peers: peers, PeerID: peerID,