import (
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ "container/list"
"crypto"
"crypto/rand"
"errors"
type connection struct {
Socket net.Conn
post chan peer_protocol.Message
+ write chan []byte
Interested bool
Choked bool
PeerInterested bool
PeerChoked bool
PeerRequests []peer_protocol.Request
+ PeerExtensions [8]byte
+}
+
+func (conn *connection) writer() {
+ for {
+ b := <-conn.write
+ log.Printf("writing %#v", string(b))
+ n, err := conn.Socket.Write(b)
+ if err != nil {
+ log.Print(err)
+ close(conn.write)
+ break
+ }
+ if n != len(b) {
+ panic("didn't write all bytes")
+ }
+ }
+}
+
+func (conn *connection) writeOptimizer() {
+ pending := list.New()
+ var nextWrite []byte
+ for {
+ write := conn.write
+ if pending.Len() == 0 {
+ write = nil
+ nextWrite = nil
+ } else {
+ nextWrite = pending.Front().Value.(peer_protocol.Message).Encode()
+ }
+ select {
+ case msg := <-conn.post:
+ pending.PushBack(msg)
+ case write <- nextWrite:
+ pending.Remove(pending.Front())
+ }
+ }
}
type torrent struct {
log.Printf("error connecting to peer: %s", err)
return
}
- me.runConnection(conn, torrent, peer.Id)
+ log.Printf("connected to %s", sock.RemoteAddr())
+ me.handshake(conn, torrent, peer.Id)
}()
}
-func (me *client) runConnection(conn net.Conn, torrent *torrent, peerId [20]byte) {
- log.Fatalf("connected to %s", conn)
+func (me *client) handshake(sock net.Conn, torrent *torrent, peerId [20]byte) {
+ conn := &connection{
+ Socket: sock,
+ Choked: true,
+ PeerChoked: true,
+ write: make(chan []byte),
+ post: make(chan peer_protocol.Message),
+ }
+ go conn.writer()
+ go conn.writeOptimizer()
+ conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
+ conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
+ if torrent != nil {
+ conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
+ conn.post <- peer_protocol.Bytes(me.PeerId[:])
+ }
+ var b [28]byte
+ _, err := io.ReadFull(conn.Socket, b[:])
+ if err != nil {
+ log.Fatal(err)
+ }
+ if string(b[:20]) != peer_protocol.Protocol {
+ log.Printf("wrong protocol: %#v", string(b[:20]))
+ return
+ }
+ if 8 != copy(conn.PeerExtensions[:], b[20:]) {
+ panic("wtf")
+ }
+ log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
+ var infoHash [20]byte
+ _, err = io.ReadFull(conn.Socket, infoHash[:])
+ if err != nil {
+ return
+ }
+ _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
+ if err != nil {
+ return
+ }
+ if torrent == nil {
+ torrent = me.torrent(infoHash)
+ if torrent == nil {
+ return
+ }
+ conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
+ conn.post <- peer_protocol.Bytes(me.PeerId[:])
+ }
}
func (me *client) openNewConns() {