From: Matt Joiner Date: Sun, 29 Sep 2013 06:45:17 +0000 (+1000) Subject: Implement peer connection writer, optimizer and handshake X-Git-Tag: v1.0.0~1811 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=abf5c92488ec4cdd5242a06dabf0c16fdc53610d;p=btrtrc.git Implement peer connection writer, optimizer and handshake --- diff --git a/client.go b/client.go index 03b02998..284c2c1c 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package torrent import ( "bitbucket.org/anacrolix/go.torrent/peer_protocol" + "container/list" "crypto" "crypto/rand" "errors" @@ -51,6 +52,7 @@ type piece struct { type connection struct { Socket net.Conn post chan peer_protocol.Message + write chan []byte Interested bool Choked bool @@ -60,6 +62,43 @@ type connection struct { PeerInterested bool PeerChoked bool PeerRequests []peer_protocol.Request + PeerExtensions [8]byte +} + +func (conn *connection) writer() { + for { + b := <-conn.write + log.Printf("writing %#v", string(b)) + n, err := conn.Socket.Write(b) + if err != nil { + log.Print(err) + close(conn.write) + break + } + if n != len(b) { + panic("didn't write all bytes") + } + } +} + +func (conn *connection) writeOptimizer() { + pending := list.New() + var nextWrite []byte + for { + write := conn.write + if pending.Len() == 0 { + write = nil + nextWrite = nil + } else { + nextWrite = pending.Front().Value.(peer_protocol.Message).Encode() + } + select { + case msg := <-conn.post: + pending.PushBack(msg) + case write <- nextWrite: + pending.Remove(pending.Front()) + } + } } type torrent struct { @@ -213,12 +252,57 @@ func (me *client) initiateConn(peer Peer, torrent *torrent) { log.Printf("error connecting to peer: %s", err) return } - me.runConnection(conn, torrent, peer.Id) + log.Printf("connected to %s", sock.RemoteAddr()) + me.handshake(conn, torrent, peer.Id) }() } -func (me *client) runConnection(conn net.Conn, torrent *torrent, peerId [20]byte) { - log.Fatalf("connected to %s", conn) +func (me *client) handshake(sock net.Conn, torrent *torrent, peerId [20]byte) { + conn := &connection{ + Socket: sock, + Choked: true, + PeerChoked: true, + write: make(chan []byte), + post: make(chan peer_protocol.Message), + } + go conn.writer() + go conn.writeOptimizer() + conn.post <- peer_protocol.Bytes(peer_protocol.Protocol) + conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00") + if torrent != nil { + conn.post <- peer_protocol.Bytes(torrent.InfoHash[:]) + conn.post <- peer_protocol.Bytes(me.PeerId[:]) + } + var b [28]byte + _, err := io.ReadFull(conn.Socket, b[:]) + if err != nil { + log.Fatal(err) + } + if string(b[:20]) != peer_protocol.Protocol { + log.Printf("wrong protocol: %#v", string(b[:20])) + return + } + if 8 != copy(conn.PeerExtensions[:], b[20:]) { + panic("wtf") + } + log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:])) + var infoHash [20]byte + _, err = io.ReadFull(conn.Socket, infoHash[:]) + if err != nil { + return + } + _, err = io.ReadFull(conn.Socket, conn.PeerId[:]) + if err != nil { + return + } + if torrent == nil { + torrent = me.torrent(infoHash) + if torrent == nil { + return + } + conn.post <- peer_protocol.Bytes(torrent.InfoHash[:]) + conn.post <- peer_protocol.Bytes(me.PeerId[:]) + } } func (me *client) openNewConns() { diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go index 121da9fd..bc34d47b 100644 --- a/peer_protocol/protocol.go +++ b/peer_protocol/protocol.go @@ -5,6 +5,10 @@ type ( Integer uint32 ) +const ( + Protocol = "\x13BitTorrent protocol" +) + const ( Choke MessageType = iota Unchoke @@ -21,9 +25,12 @@ type Request struct { Index, Begin, Length Integer } -type Message struct { - KeepAlive bool - Type MessageType - Bitfield []bool - Piece []byte +type Message interface { + Encode() []byte +} + +type Bytes []byte + +func (b Bytes) Encode() []byte { + return b }