"syscall"
"time"
+ "bitbucket.org/anacrolix/go.torrent/mse"
+
"bitbucket.org/anacrolix/go.torrent/data"
filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
"bitbucket.org/anacrolix/go.torrent/dht"
//
// Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
// DHT: http://www.bittorrent.org/beps/bep_0005.html
- // Fast Extension: http://bittorrent.org/beps/bep_0006.html
- extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
+ // Fast Extension: http://bittorrent.org/beps/bep_0006.html ([7]|=4)
+ defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
socketsPerTorrent = 40
torrentPeersHighWater = 200
// Limit how long handshake can take. This is to reduce the lingering
// impact of a few bad apples. 4s loses 1% of successful handshakes that
// are obtained with 60s timeout, and 5% of unsuccessful handshakes.
- handshakeTimeout = 4 * time.Second
+ handshakeTimeout = 45 * time.Second
pruneInterval = 10 * time.Second
)
_configDir string
config Config
pruneTimer *time.Timer
+ extensionBytes peerExtensionBytes
torrentDataOpener TorrentDataOpener
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
}
+ CopyExact(&cl.extensionBytes, defaultExtensionBytes)
cl.event.L = &cl.mu
if cfg.TorrentDataOpener != nil {
cl.torrentDataOpener = cfg.TorrentDataOpener
return string(ip.To16()), nil
}
-func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
+func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
var err error
for b := range bb {
_, err = w.Write(b)
if err != nil {
- w.Close()
break
}
}
peerID [20]byte
)
+func (me *peerExtensionBytes) SupportsExtended() bool {
+ return me[5]&0x10 != 0
+}
+
+func (me *peerExtensionBytes) SupportsDHT() bool {
+ return me[7]&0x01 != 0
+}
+
+func (me *peerExtensionBytes) SupportsFast() bool {
+ return me[7]&0x04 != 0
+}
+
type handshakeResult struct {
peerExtensionBytes
peerID
// peer initiated the connection. Returns ok if the handshake was successful,
// and err if there was an unexpected condition other than the peer simply
// abandoning the handshake.
-func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
+func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
// Bytes to be sent to the peer. Should never block the sender.
postCh := make(chan []byte, 4)
// A single error value sent when the writer completes.
}
post([]byte(pp.Protocol))
- post([]byte(extensionBytes))
+ post(extensions[:])
if ih != nil { // We already know what we want.
post(ih[:])
post(peerID[:])
me.mu.Lock()
me.handshaking++
me.mu.Unlock()
- hsRes, ok, err := handshake(sock, func() *InfoHash {
+ var rw io.ReadWriter = sock
+ if torrent == nil {
+ rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
+ for ih := range me.torrents {
+ ret = append(ret, ih[:])
+ }
+ return
+ }())
+ } else {
+ rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
+ }
+ if err != nil {
+ err = fmt.Errorf("error during MSE handshake: %s", err)
+ return
+ }
+ hsRes, ok, err := handshake(rw, func() *InfoHash {
if torrent == nil {
return nil
} else {
return &torrent.InfoHash
}
- }(), me.peerID)
+ }(), me.peerID, me.extensionBytes)
me.mu.Lock()
defer me.mu.Unlock()
if me.handshaking == 0 {
}
sock.SetWriteDeadline(time.Time{})
sock = peerConn{sock}
- conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
+ conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
defer conn.Close()
conn.Discovery = discovery
if !me.addConnection(torrent, conn) {
return
}
- if conn.PeerExtensionBytes[5]&0x10 != 0 {
+ if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
conn.Post(pp.Message{
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
if p := me.incomingPeerPort(); p != 0 {
d["p"] = p
}
- yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
+ yourip, err := addrCompactIP(conn.remoteAddr())
if err != nil {
log.Printf("error calculating yourip field value in extension handshake: %s", err)
} else {
Type: pp.Bitfield,
Bitfield: torrent.bitfield(),
})
+ } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
+ conn.Post(pp.Message{
+ Type: pp.HaveNone,
+ })
}
- if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
+ if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
conn.Post(pp.Message{
Type: pp.Port,
Port: uint16(AddrPort(me.dHT.LocalAddr())),
}
if torrent.haveInfo() {
torrent.initRequestOrdering(conn)
- me.replenishConnRequests(torrent, conn)
}
err = me.connectionLoop(torrent, conn)
if err != nil {
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
decoder := pp.Decoder{
- R: bufio.NewReader(c.Socket),
+ R: bufio.NewReader(c.rw),
MaxLength: 256 * 1024,
}
for {
me.replenishConnRequests(t, c)
case pp.Reject:
me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
+ me.replenishConnRequests(t, c)
case pp.Unchoke:
c.PeerChoked = false
me.peerUnchoked(t, c)
if me.dHT == nil {
break
}
- pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
+ pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
if err != nil {
panic(err)
}
// Maintains the state of a connection with a peer.
type connection struct {
- Socket net.Conn
+ conn net.Conn
+ rw io.ReadWriter // The real slim shady
Discovery peerSource
uTP bool
closing chan struct{}
PeerClientName string
}
-func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
+func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) {
c = &connection{
- Socket: sock,
- uTP: uTP,
+ conn: sock,
+ rw: rw,
+ uTP: uTP,
Choked: true,
PeerChoked: true,
return
}
+func (cn *connection) remoteAddr() net.Addr {
+ return cn.conn.RemoteAddr()
+}
+
+func (cn *connection) localAddr() net.Addr {
+ return cn.conn.LocalAddr()
+}
+
func (cn *connection) pendPiece(piece int, priority piecePriority) {
if priority == piecePriorityNone {
cn.pieceRequestOrder.DeletePiece(piece)
func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
// \t isn't preserved in <pre> blocks?
- fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+ fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.localAddr(), cn.remoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
c := func(b byte) {
fmt.Fprintf(w, "%c", b)
}
}
close(c.closing)
// TODO: This call blocks sometimes, why?
- go c.Socket.Close()
+ go c.conn.Close()
}
func (c *connection) PeerHasPiece(piece int) bool {
// Writes buffers to the socket from the write channel.
func (conn *connection) writer() {
// Reduce write syscalls.
- buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
+ buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
// Receives when buf is not empty.
notEmpty := make(chan struct{}, 1)
for {