1
0
mirror of https://github.com/veggiedefender/torrent-client.git synced 2025-11-06 09:29:16 +02:00

Split client into its own package

This commit is contained in:
Jesse Li
2020-01-02 13:48:21 -05:00
parent f46d43985a
commit e9edc62c57
2 changed files with 50 additions and 43 deletions

View File

@@ -1,4 +1,4 @@
package p2p package client
import ( import (
"bufio" "bufio"
@@ -16,14 +16,15 @@ import (
"github.com/veggiedefender/torrent-client/handshake" "github.com/veggiedefender/torrent-client/handshake"
) )
type client struct { // A Client is a TCP connection with a peer
type Client struct {
Conn net.Conn
Choked bool
Bitfield bitfield.Bitfield
peer peers.Peer peer peers.Peer
infoHash [20]byte infoHash [20]byte
peerID [20]byte peerID [20]byte
conn net.Conn
reader *bufio.Reader reader *bufio.Reader
bitfield bitfield.Bitfield
choked bool
} }
func completeHandshake(conn net.Conn, r *bufio.Reader, infohash, peerID [20]byte) (*handshake.Handshake, error) { func completeHandshake(conn net.Conn, r *bufio.Reader, infohash, peerID [20]byte) (*handshake.Handshake, error) {
@@ -62,7 +63,9 @@ func recvBitfield(conn net.Conn, r *bufio.Reader) (bitfield.Bitfield, error) {
return msg.Payload, nil return msg.Payload, nil
} }
func newClient(peer peers.Peer, peerID, infoHash [20]byte) (*client, error) { // New connects with a peer, completes a handshake, and receives a handshake
// returns an err if any of those fail.
func New(peer peers.Peer, peerID, infoHash [20]byte) (*Client, error) {
hostPort := net.JoinHostPort(peer.IP.String(), strconv.Itoa(int(peer.Port))) hostPort := net.JoinHostPort(peer.IP.String(), strconv.Itoa(int(peer.Port)))
conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second) conn, err := net.DialTimeout("tcp", hostPort, 3*time.Second)
if err != nil { if err != nil {
@@ -82,56 +85,59 @@ func newClient(peer peers.Peer, peerID, infoHash [20]byte) (*client, error) {
return nil, err return nil, err
} }
return &client{ return &Client{
Conn: conn,
Choked: true,
Bitfield: bf,
peer: peer, peer: peer,
infoHash: infoHash, infoHash: infoHash,
peerID: peerID, peerID: peerID,
conn: conn,
reader: reader, reader: reader,
bitfield: bf,
choked: true,
}, nil }, nil
} }
func (c *client) hasPiece(index int) bool { // HasNext returns true if there are unread messages from the peer
return c.bitfield.HasPiece(index) func (c *Client) HasNext() bool {
}
func (c *client) hasNext() bool {
return c.reader.Buffered() > 0 return c.reader.Buffered() > 0
} }
func (c *client) read() (*message.Message, error) { // Read reads and consumes a message from the connection
func (c *Client) Read() (*message.Message, error) {
msg, err := message.Read(c.reader) msg, err := message.Read(c.reader)
return msg, err return msg, err
} }
func (c *client) sendRequest(index, begin, length int) error { // SendRequest sends a Request message to the peer
func (c *Client) SendRequest(index, begin, length int) error {
req := message.FormatRequest(index, begin, length) req := message.FormatRequest(index, begin, length)
_, err := c.conn.Write(req.Serialize()) _, err := c.Conn.Write(req.Serialize())
return err return err
} }
func (c *client) sendInterested() error { // SendInterested sends an Interested message to the peer
func (c *Client) SendInterested() error {
msg := message.Message{ID: message.MsgInterested} msg := message.Message{ID: message.MsgInterested}
_, err := c.conn.Write(msg.Serialize()) _, err := c.Conn.Write(msg.Serialize())
return err return err
} }
func (c *client) sendNotInterested() error { // SendNotInterested sends a NotInterested message to the peer
func (c *Client) SendNotInterested() error {
msg := message.Message{ID: message.MsgNotInterested} msg := message.Message{ID: message.MsgNotInterested}
_, err := c.conn.Write(msg.Serialize()) _, err := c.Conn.Write(msg.Serialize())
return err return err
} }
func (c *client) sendUnchoke() error { // SendUnchoke sends an Unchoke message to the peer
func (c *Client) SendUnchoke() error {
msg := message.Message{ID: message.MsgUnchoke} msg := message.Message{ID: message.MsgUnchoke}
_, err := c.conn.Write(msg.Serialize()) _, err := c.Conn.Write(msg.Serialize())
return err return err
} }
func (c *client) sendHave(index int) error { // SendHave sends a Have message to the peer
func (c *Client) SendHave(index int) error {
msg := message.FormatHave(index) msg := message.FormatHave(index)
_, err := c.conn.Write(msg.Serialize()) _, err := c.Conn.Write(msg.Serialize())
return err return err
} }

View File

@@ -8,6 +8,7 @@ import (
"runtime" "runtime"
"time" "time"
"github.com/veggiedefender/torrent-client/client"
"github.com/veggiedefender/torrent-client/message" "github.com/veggiedefender/torrent-client/message"
"github.com/veggiedefender/torrent-client/peers" "github.com/veggiedefender/torrent-client/peers"
) )
@@ -42,7 +43,7 @@ type pieceResult struct {
type pieceProgress struct { type pieceProgress struct {
index int index int
client *client client *client.Client
buf []byte buf []byte
downloaded int downloaded int
requested int requested int
@@ -50,7 +51,7 @@ type pieceProgress struct {
} }
func (state *pieceProgress) readMessage() error { func (state *pieceProgress) readMessage() error {
msg, err := state.client.read() // this call blocks msg, err := state.client.Read() // this call blocks
if err != nil { if err != nil {
return err return err
} }
@@ -61,15 +62,15 @@ func (state *pieceProgress) readMessage() error {
switch msg.ID { switch msg.ID {
case message.MsgUnchoke: case message.MsgUnchoke:
state.client.choked = false state.client.Choked = false
case message.MsgChoke: case message.MsgChoke:
state.client.choked = true state.client.Choked = true
case message.MsgHave: case message.MsgHave:
index, err := message.ParseHave(msg) index, err := message.ParseHave(msg)
if err != nil { if err != nil {
return err return err
} }
state.client.bitfield.SetPiece(index) state.client.Bitfield.SetPiece(index)
case message.MsgPiece: case message.MsgPiece:
n, err := message.ParsePiece(state.index, state.buf, msg) n, err := message.ParsePiece(state.index, state.buf, msg)
if err != nil { if err != nil {
@@ -86,7 +87,7 @@ func (state *pieceProgress) readMessages() error {
if err != nil { if err != nil {
return err return err
} }
for state.client.hasNext() { for state.client.HasNext() {
err := state.readMessage() err := state.readMessage()
if err != nil { if err != nil {
return err return err
@@ -95,7 +96,7 @@ func (state *pieceProgress) readMessages() error {
return nil return nil
} }
func attemptDownloadPiece(c *client, pw *pieceWork) ([]byte, error) { func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) {
state := pieceProgress{ state := pieceProgress{
index: pw.index, index: pw.index,
client: c, client: c,
@@ -104,12 +105,12 @@ func attemptDownloadPiece(c *client, pw *pieceWork) ([]byte, error) {
// Setting a deadline helps get unresponsive peers unstuck. // Setting a deadline helps get unresponsive peers unstuck.
// 30 seconds is more than enough time to download a 262 KB piece // 30 seconds is more than enough time to download a 262 KB piece
c.conn.SetDeadline(time.Now().Add(30 * time.Second)) c.Conn.SetDeadline(time.Now().Add(30 * time.Second))
defer c.conn.SetDeadline(time.Time{}) // Disable the deadline defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline
for state.downloaded < pw.length { for state.downloaded < pw.length {
// If unchoked, send requests until we have enough unfulfilled requests // If unchoked, send requests until we have enough unfulfilled requests
if !state.client.choked { if !state.client.Choked {
for state.backlog < MaxBacklog && state.requested < pw.length { for state.backlog < MaxBacklog && state.requested < pw.length {
blockSize := MaxBlockSize blockSize := MaxBlockSize
// Last block might be shorter than the typical block // Last block might be shorter than the typical block
@@ -117,7 +118,7 @@ func attemptDownloadPiece(c *client, pw *pieceWork) ([]byte, error) {
blockSize = pw.length - state.requested blockSize = pw.length - state.requested
} }
err := c.sendRequest(pw.index, state.requested, blockSize) err := c.SendRequest(pw.index, state.requested, blockSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -145,19 +146,19 @@ func checkIntegrity(pw *pieceWork, buf []byte) error {
} }
func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) { func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) {
c, err := newClient(peer, t.PeerID, t.InfoHash) c, err := client.New(peer, t.PeerID, t.InfoHash)
if err != nil { if err != nil {
log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP) log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP)
return return
} }
defer c.conn.Close() defer c.Conn.Close()
log.Printf("Completed handshake with %s\n", peer.IP) log.Printf("Completed handshake with %s\n", peer.IP)
c.sendUnchoke() c.SendUnchoke()
c.sendInterested() c.SendInterested()
for pw := range workQueue { for pw := range workQueue {
if !c.hasPiece(pw.index) { if !c.Bitfield.HasPiece(pw.index) {
workQueue <- pw // Put piece back on the queue workQueue <- pw // Put piece back on the queue
continue continue
} }
@@ -177,7 +178,7 @@ func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork
continue continue
} }
c.sendHave(pw.index) c.SendHave(pw.index)
results <- &pieceResult{pw.index, buf} results <- &pieceResult{pw.index, buf}
} }
} }