From 7870d205730d2d7a446ef5e2ad1bf2433506633b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 29 Sep 2013 08:11:24 +1000 Subject: [PATCH] Begin implementing the adding of peers, and initiating of connections --- client.go | 117 +++++++++++++++++-- cmd/torrent/main.go | 8 ++ peer_protocol/protocol.go | 29 +++++ {protocol => peer_protocol}/protocol_test.go | 2 +- protocol/protocol.go | 116 ------------------ 5 files changed, 145 insertions(+), 127 deletions(-) create mode 100644 peer_protocol/protocol.go rename {protocol => peer_protocol}/protocol_test.go (87%) delete mode 100644 protocol/protocol.go diff --git a/client.go b/client.go index 54cf853c..e5811dcc 100644 --- a/client.go +++ b/client.go @@ -1,11 +1,15 @@ package torrent import ( + "bitbucket.org/anacrolix/go.torrent/peer_protocol" "crypto" + "crypto/rand" "errors" metainfo "github.com/nsf/libtorgo/torrent" "io" "launchpad.net/gommap" + "log" + "net" "os" "path/filepath" ) @@ -14,7 +18,7 @@ const ( PieceHash = crypto.SHA1 ) -type infoHash [20]byte +type InfoHash [20]byte type pieceSum [20]byte @@ -24,7 +28,7 @@ func copyHashSum(dst, src []byte) { } } -func BytesInfoHash(b []byte) (ih infoHash) { +func BytesInfoHash(b []byte) (ih InfoHash) { if len(b) != len(ih) { panic("bad infohash bytes") } @@ -44,11 +48,33 @@ type piece struct { Hash pieceSum } +type connection struct { + Socket net.Conn + post chan peer_protocol.Message + + Interested bool + Choked bool + Requests []peer_protocol.Request + + PeerId [20]byte + PeerInterested bool + PeerChoked bool + PeerRequests []peer_protocol.Request +} + type torrent struct { - InfoHash infoHash + InfoHash InfoHash Pieces []piece Data MMapSpan MetaInfo *metainfo.MetaInfo + Conns []connection + Peers []Peer +} + +type Peer struct { + Id [20]byte + IP net.IP + Port int } func (t torrent) PieceSize(piece int) (size int64) { @@ -79,25 +105,35 @@ func (t torrent) HashPiece(piece int) (ps pieceSum) { } type client struct { - DataDir string + DataDir string + HalfOpenLimit int + PeerId [20]byte + + halfOpen int + torrents map[InfoHash]*torrent noTorrents chan struct{} addTorrent chan *torrent - torrents map[infoHash]*torrent - torrentFinished chan infoHash + torrentFinished chan InfoHash actorTask chan func() } func NewClient(dataDir string) *client { c := &client{ - DataDir: dataDir, + DataDir: dataDir, + HalfOpenLimit: 10, + + torrents: make(map[InfoHash]*torrent), noTorrents: make(chan struct{}), addTorrent: make(chan *torrent), - torrents: make(map[infoHash]*torrent), - torrentFinished: make(chan infoHash), + torrentFinished: make(chan InfoHash), actorTask: make(chan func()), } + _, err := rand.Read(c.PeerId[:]) + if err != nil { + panic("error generating peer id") + } go c.run() return c } @@ -144,6 +180,67 @@ func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan return } +func (me *client) torrent(ih InfoHash) *torrent { + for _, t := range me.torrents { + if t.InfoHash == ih { + return t + } + } + return nil +} + +func (me *client) initiateConn(peer Peer, torrent *torrent) { + if peer.Id == me.PeerId { + return + } + me.halfOpen++ + go func() { + conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{ + IP: peer.IP, + Port: peer.Port, + }) + me.withContext(func() { + me.halfOpen-- + me.openNewConns() + }) + if err != nil { + log.Printf("error connecting to peer: %s", err) + return + } + me.runConnection(conn, torrent, peer.Id) + }() +} + +func (me *client) runConnection(conn net.Conn, torrent *torrent, peerId [20]byte) { + log.Fatalf("connected to %s", conn) +} + +func (me *client) openNewConns() { + for _, t := range me.torrents { + for len(t.Peers) != 0 { + if me.halfOpen >= me.HalfOpenLimit { + return + } + p := t.Peers[0] + t.Peers = t.Peers[1:] + me.initiateConn(p, t) + } + } +} + +func (me *client) AddPeers(infoHash InfoHash, peers []Peer) (err error) { + me.withContext(func() { + t := me.torrent(infoHash) + if t == nil { + err = errors.New("no such torrent") + return + } + t.Peers = append(t.Peers, peers...) + me.openNewConns() + }) + return +} + func (me *client) AddTorrent(metaInfo *metainfo.MetaInfo) error { torrent := &torrent{ InfoHash: BytesInfoHash(metaInfo.InfoHash), @@ -178,7 +275,7 @@ func (me *client) withContext(f func()) { me.actorTask <- f } -func (me *client) pieceHashed(ih infoHash, piece int, correct bool) { +func (me *client) pieceHashed(ih InfoHash, piece int, correct bool) { torrent := me.torrents[ih] torrent.Pieces[piece].State = func() pieceState { if correct { diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 372d397c..476e81d7 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -5,6 +5,7 @@ import ( "flag" metainfo "github.com/nsf/libtorgo/torrent" "log" + "net" ) var ( @@ -26,6 +27,13 @@ func main() { if err != nil { log.Fatal(err) } + err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{ + IP: net.IPv4(127, 0, 0, 1), + Port: 63983, + }}) + if err != nil { + log.Fatal(err) + } } client.WaitAll() client.Close() diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go new file mode 100644 index 00000000..121da9fd --- /dev/null +++ b/peer_protocol/protocol.go @@ -0,0 +1,29 @@ +package peer_protocol + +type ( + MessageType byte + Integer uint32 +) + +const ( + Choke MessageType = iota + Unchoke + Interested + NotInterested + Have + Bitfield + RequestType + Piece + Cancel +) + +type Request struct { + Index, Begin, Length Integer +} + +type Message struct { + KeepAlive bool + Type MessageType + Bitfield []bool + Piece []byte +} diff --git a/protocol/protocol_test.go b/peer_protocol/protocol_test.go similarity index 87% rename from protocol/protocol_test.go rename to peer_protocol/protocol_test.go index 32db2a62..6306d52c 100644 --- a/protocol/protocol_test.go +++ b/peer_protocol/protocol_test.go @@ -1,4 +1,4 @@ -package protocol +package peer_protocol import ( "testing" diff --git a/protocol/protocol.go b/protocol/protocol.go deleted file mode 100644 index c4f2cb1a..00000000 --- a/protocol/protocol.go +++ /dev/null @@ -1,116 +0,0 @@ -package protocol - -import ( - "bytes" - "encoding/binary" - "errors" - // "errors" - "fmt" - "io" - "io/ioutil" - // "os" -) - -type ( - MessageType byte - Integer uint32 - PieceIndex Integer - PieceOffset Integer -) - -const ( - Choke MessageType = iota - Unchoke - Interested - NotInterested - Have - Bitfield - Request - Piece - Cancel -) - -type Message struct { - KeepAlive bool - Type MessageType - Bitfield []bool - Index PieceIndex - Begin PieceOffset - Length Integer - Piece []byte -} - -func (msg *Message) UnmarshalReader(r io.Reader) (err error) { - err = binary.Read(r, binary.BigEndian, &msg.Type) - switch err { - case nil: - msg.KeepAlive = false - case io.EOF: - msg.KeepAlive = true - err = nil - return - default: - return - } - switch msg.Type { - case Choke, Unchoke, Interested, NotInterested: - case Have: - err = binary.Read(r, binary.BigEndian, &msg.Index) - case Request, Cancel: - err = binary.Read(r, binary.BigEndian, &msg.Index) - if err != nil { - return - } - err = binary.Read(r, binary.BigEndian, &msg.Begin) - if err != nil { - return - } - err = binary.Read(r, binary.BigEndian, &msg.Length) - case Bitfield: - // var bf []byte - _, err = ioutil.ReadAll(r) - if err != nil { - return - } - case Piece: - default: - return fmt.Errorf("unknown type: %v", msg.Type) - } - return -} - -func (msg *Message) MarshalBinary() (b []byte, err error) { - w := &bytes.Buffer{} - if msg.KeepAlive { - b = w.Bytes() - return - } - err = w.WriteByte(byte(msg.Type)) - if err != nil { - return - } - switch msg.Type { - case Choke, Unchoke, Interested, NotInterested: - case Have: - err = binary.Write(w, binary.BigEndian, msg.Index) - case Request, Cancel: - err = binary.Write(w, binary.BigEndian, msg.Index) - if err != nil { - return - } - err = binary.Write(w, binary.BigEndian, msg.Begin) - if err != nil { - return - } - err = binary.Write(w, binary.BigEndian, msg.Length) - case Bitfield, Piece: - panic("unimplemented") - default: - err = errors.New("unknown type") - return - } - if err == nil { - b = w.Bytes() - } - return -} -- 2.48.1