"container/heap"
"crypto/rand"
"crypto/sha1"
+ "encoding/hex"
"errors"
"expvar"
"fmt"
successfulDials = expvar.NewInt("successfulDials")
acceptedConns = expvar.NewInt("acceptedConns")
inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked")
+ peerExtensions = expvar.NewMap("peerExtensions")
)
const (
//
// Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
// DHT: http://www.bittorrent.org/beps/bep_0005.html
- extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
+ // Fast Extension: http://bittorrent.org/beps/bep_0006.html
+ extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
socketsPerTorrent = 40
torrentPeersHighWater = 200
CopyExact(&res.peerExtensionBytes, b[20:28])
CopyExact(&res.InfoHash, b[28:48])
CopyExact(&res.peerID, b[48:68])
+ peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
+ // TODO: Maybe we can just drop peers here if we're not interested. This
+ // could prevent them trying to reconnect, falsely believing there was
+ // just a problem.
if ih == nil { // We were waiting for the peer to tell us what they wanted.
post(res.InfoHash[:])
post(peerID[:])
c.piecePriorities = mathRand.Perm(t.numPieces())
c.pieceRequestOrder = pieceordering.New()
for i := 0; i < t.numPieces(); i++ {
- if !c.PeerHasPiece(pp.Integer(i)) {
+ if !c.PeerHasPiece(i) {
continue
}
if !t.wantPiece(i) {
}
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
- if t.haveInfo() {
- if c.PeerPieces == nil {
- c.PeerPieces = make([]bool, t.numPieces())
- }
- } else {
- for piece >= len(c.PeerPieces) {
- c.PeerPieces = append(c.PeerPieces, false)
+ if !c.peerHasAll {
+ if t.haveInfo() {
+ if c.PeerPieces == nil {
+ c.PeerPieces = make([]bool, t.numPieces())
+ }
+ } else {
+ for piece >= len(c.PeerPieces) {
+ c.PeerPieces = append(c.PeerPieces, false)
+ }
}
+ c.PeerPieces[piece] = true
}
- c.PeerPieces[piece] = true
if t.wantPiece(piece) {
t.connPendPiece(c, piece)
me.replenishConnRequests(t, c)
return AddrPort(addr)
}
+func (cl *Client) peerHasAll(t *torrent, cn *connection) {
+ cn.peerHasAll = true
+ cn.PeerPieces = nil
+ if t.haveInfo() {
+ for i := 0; i < t.numPieces(); i++ {
+ cl.peerGotPiece(t, cn, i)
+ }
+ }
+}
+
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
}
// We can then reset our interest.
me.replenishConnRequests(t, c)
+ case pp.Reject:
+ me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
case pp.Unchoke:
c.PeerChoked = false
me.peerUnchoked(t, c)
unexpectedCancels.Add(1)
}
case pp.Bitfield:
- if c.PeerPieces != nil {
+ if c.PeerPieces != nil || c.peerHasAll {
err = errors.New("received unexpected bitfield")
break
}
me.peerGotPiece(t, c, index)
}
}
+ case pp.HaveAll:
+ if c.PeerPieces != nil || c.peerHasAll {
+ err = errors.New("unexpected have-all")
+ break
+ }
+ me.peerHasAll(t, c)
+ case pp.HaveNone:
+ if c.peerHasAll || c.PeerPieces != nil {
+ err = errors.New("unexpected have-none")
+ break
+ }
+ c.PeerPieces = make([]bool, func() int {
+ if t.haveInfo() {
+ return t.numPieces()
+ } else {
+ return 0
+ }
+ }())
case pp.Piece:
err = me.downloadedChunk(t, c, &msg)
case pp.Extended:
}
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
pieceIndex := e.Piece()
- if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+ if !c.PeerHasPiece(pieceIndex) {
panic("piece in request order but peer doesn't have it")
}
if !t.wantPiece(pieceIndex) {
}
conn.pieceRequestOrder.DeletePiece(int(piece))
}
- if t.wantPiece(int(piece)) && conn.PeerHasPiece(pp.Integer(piece)) {
+ if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
t.connPendPiece(conn, int(piece))
me.replenishConnRequests(t, conn)
}
{5 * 1024 * 1024, 1048576, 4},
} {
if readaheadPieces(case_.readaheadBytes, case_.pieceLength) != case_.readaheadPieces {
- t.Fatalf("case failed: %s", case_)
+ t.Fatalf("case failed: %v", case_)
}
}
}
PeerExtensionBytes peerExtensionBytes
// Whether the peer has the given piece. nil if they've not sent any
// related messages yet.
- PeerPieces []bool
+ PeerPieces []bool
+ peerHasAll bool
+
PeerMaxRequests int // Maximum pending requests the peer allows.
PeerExtensionIDs map[string]int64
PeerClientName string
return ok
}
-func (cn *connection) completedString() string {
- if cn.PeerPieces == nil {
+func (cn *connection) completedString(t *torrent) string {
+ if cn.PeerPieces == nil && !cn.peerHasAll {
return "?"
}
- // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
- // return fmt.Sprintf("%d%%", int(f*100))
- return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
-}
-
-func (cn *connection) totalPiecesCount() int {
- return len(cn.PeerPieces)
-}
-
-func (cn *connection) piecesPeerHasCount() (count int) {
- for _, has := range cn.PeerPieces {
- if has {
- count++
+ return fmt.Sprintf("%d/%d", func() int {
+ if cn.peerHasAll {
+ if t.haveInfo() {
+ return t.numPieces()
+ }
+ return -1
}
- }
- return
+ ret := 0
+ for _, b := range cn.PeerPieces {
+ if b {
+ ret++
+ }
+ }
+ return ret
+ }(), func() int {
+ if cn.peerHasAll || cn.PeerPieces == nil {
+ if t.haveInfo() {
+ return t.numPieces()
+ }
+ return -1
+ }
+ return len(cn.PeerPieces)
+ }())
}
// Correct the PeerPieces slice length. Return false if the existing slice is
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
// messages.
func (cn *connection) setNumPieces(num int) error {
+ if cn.peerHasAll {
+ return nil
+ }
if cn.PeerPieces == nil {
return nil
}
return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
}
-func (cn *connection) WriteStatus(w io.Writer) {
+func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
// \t isn't preserved in <pre> blocks?
- fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
+ fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
c := func(b byte) {
fmt.Fprintf(w, "%c", b)
}
go c.Socket.Close()
}
-func (c *connection) PeerHasPiece(index pp.Integer) bool {
- if c.PeerPieces == nil {
- return false
+func (c *connection) PeerHasPiece(piece int) bool {
+ if c.peerHasAll {
+ return true
}
- if int(index) >= len(c.PeerPieces) {
+ if piece >= len(c.PeerPieces) {
return false
}
- return c.PeerPieces[index]
+ return c.PeerPieces[piece]
}
func (c *connection) Post(msg pp.Message) {
if len(c.Requests) >= c.PeerMaxRequests {
return false
}
- if !c.PeerHasPiece(chunk.Index) {
+ if !c.PeerHasPiece(int(chunk.Index)) {
return true
}
if c.RequestPending(chunk) {
Piece // 7
Cancel // 8
Port // 9
- Extended = 20
+
+ // BEP 6
+ Suggest = 0xd // 13
+ HaveAll = 0xe // 14
+ HaveNone = 0xf // 15
+ Reject = 0x10 // 16
+ AllowedFast = 0x11 // 17
+
+ Extended = 20
HandshakeExtendedID = 0
return
}
switch msg.Type {
- case Choke, Unchoke, Interested, NotInterested:
+ case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
case Have:
err = binary.Write(buf, binary.BigEndian, msg.Index)
- case Request, Cancel:
+ case Request, Cancel, Reject:
for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} {
err = binary.Write(buf, binary.BigEndian, i)
if err != nil {
}
msg.Type = MessageType(c)
switch msg.Type {
- case Choke, Unchoke, Interested, NotInterested:
+ case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
return
case Have:
err = msg.Index.Read(r)
- case Request, Cancel:
+ case Request, Cancel, Reject:
for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
err = data.Read(r)
if err != nil {
t: t,
})
for _, c := range t.Conns {
- c.WriteStatus(w)
+ c.WriteStatus(w, t)
}
}
}
func (t *torrent) numPieces() int {
- return len(t.Info.Pieces) / 20
+ return t.Info.NumPieces()
}
func (t *torrent) numPiecesCompleted() (num int) {
func (t *torrent) connHasWantedPieces(c *connection) bool {
for p := range t.Pieces {
- if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
+ if t.wantPiece(p) && c.PeerHasPiece(p) {
return true
}
}