18 "github.com/anacrolix/missinggo"
19 "github.com/anacrolix/missinggo/bitmap"
20 "github.com/anacrolix/missinggo/iter"
21 "github.com/anacrolix/missinggo/prioritybitmap"
23 "github.com/anacrolix/torrent/bencode"
24 pp "github.com/anacrolix/torrent/peer_protocol"
27 var optimizedCancels = expvar.NewInt("optimizedCancels")
29 type peerSource string
32 peerSourceTracker = "T" // It's the default.
33 peerSourceIncoming = "I"
34 peerSourceDHTGetPeers = "Hg"
35 peerSourceDHTAnnouncePeer = "Ha"
39 // Maintains the state of a connection with a peer.
40 type connection struct {
42 // The actual Conn, used for closing, and setting socket options.
44 // The Reader and Writer for this Conn, with hooks installed for stats,
45 // limiting, deadlines etc.
48 // True if the connection is operating over MSE obfuscation.
52 closed missinggo.Event
55 UnwantedChunksReceived int
56 UsefulChunksReceived int
61 lastMessageReceived time.Time
62 completedHandshake time.Time
63 lastUsefulChunkReceived time.Time
64 lastChunkSent time.Time
66 // Stuff controlled by the local peer.
69 requests map[request]struct{}
71 // Indexed by metadata piece, set to true if posted and pending a
73 metadataRequests []bool
76 // Stuff controlled by the remote peer.
80 PeerRequests map[request]struct{}
81 PeerExtensionBytes peerExtensionBytes
82 // The pieces the peer has claimed to have.
83 peerPieces bitmap.Bitmap
84 // The peer has everything. This can occur due to a special message, when
85 // we may not even know the number of pieces in the torrent yet.
87 // The highest possible number of pieces the torrent could have based on
88 // communication with the peer. Generally only useful until we have the
91 // Pieces we've accepted chunks for from the peer.
92 peerTouchedPieces map[int]struct{}
94 PeerMaxRequests int // Maximum pending requests the peer allows.
95 PeerExtensionIDs map[string]byte
98 pieceInclination []int
99 pieceRequestOrder prioritybitmap.PriorityBitmap
101 postedBuffer bytes.Buffer
105 func (cn *connection) mu() sync.Locker {
109 func (cn *connection) remoteAddr() net.Addr {
110 return cn.conn.RemoteAddr()
113 func (cn *connection) localAddr() net.Addr {
114 return cn.conn.LocalAddr()
117 func (cn *connection) supportsExtension(ext string) bool {
118 _, ok := cn.PeerExtensionIDs[ext]
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
125 return cn.t.numPieces()
127 return cn.peerMinPieces
130 func (cn *connection) completedString() string {
131 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
137 func (cn *connection) setNumPieces(num int) error {
138 cn.peerPieces.RemoveRange(num, -1)
139 cn.peerPiecesChanged()
143 func eventAgeString(t time.Time) string {
147 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
150 func (cn *connection) connectionFlags() (ret string) {
152 ret += string([]byte{b})
157 ret += string(cn.Discovery)
164 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
165 func (cn *connection) statusFlags() (ret string) {
167 ret += string([]byte{b})
176 ret += cn.connectionFlags()
178 if cn.PeerInterested {
187 func (cn *connection) String() string {
189 cn.WriteStatus(&buf, nil)
193 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
194 // \t isn't preserved in <pre> blocks?
195 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
196 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
197 eventAgeString(cn.lastMessageReceived),
198 eventAgeString(cn.completedHandshake),
199 eventAgeString(cn.lastUsefulChunkReceived))
201 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
202 cn.completedString(),
203 len(cn.peerTouchedPieces),
204 cn.UsefulChunksReceived,
205 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
207 cn.numLocalRequests(),
208 len(cn.PeerRequests),
211 fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
214 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
215 pb.IterTyped(func(i int) bool {
225 func (cn *connection) Close() {
227 cn.discardPieceInclination()
228 cn.pieceRequestOrder.Clear()
230 // TODO: This call blocks sometimes, why?
235 func (cn *connection) PeerHasPiece(piece int) bool {
236 return cn.peerHasAll || cn.peerPieces.Contains(piece)
239 func (cn *connection) Post(msg pp.Message) {
240 postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
241 cn.postedBuffer.Write(msg.MustMarshalBinary())
242 cn.writerCond.Broadcast()
245 func (cn *connection) RequestPending(r request) bool {
246 _, ok := cn.requests[r]
250 func (cn *connection) requestMetadataPiece(index int) {
251 eID := cn.PeerExtensionIDs["ut_metadata"]
255 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
261 ExtendedPayload: func() []byte {
262 b, err := bencode.Marshal(map[string]int{
263 "msg_type": pp.RequestMetadataExtensionMsgType,
272 for index >= len(cn.metadataRequests) {
273 cn.metadataRequests = append(cn.metadataRequests, false)
275 cn.metadataRequests[index] = true
278 func (cn *connection) requestedMetadataPiece(index int) bool {
279 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
282 // The actual value to use as the maximum outbound requests.
283 func (cn *connection) nominalMaxRequests() (ret int) {
284 ret = cn.PeerMaxRequests
291 // Returns true if an unsatisfied request was canceled.
292 func (cn *connection) PeerCancel(r request) bool {
293 if cn.PeerRequests == nil {
296 if _, ok := cn.PeerRequests[r]; !ok {
299 delete(cn.PeerRequests, r)
303 func (cn *connection) Choke() {
310 cn.PeerRequests = nil
314 func (cn *connection) Unchoke() {
324 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
325 if cn.Interested == interested {
328 cn.Interested = interested
329 // log.Printf("%p: setting interest: %v", cn, interested)
330 return msg(pp.Message{
331 Type: func() pp.MessageType {
335 return pp.NotInterested
342 // Track connection writer buffer writes and flushes, to determine its
344 connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
345 connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
348 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
349 rs, i := cn.desiredRequestState()
350 if !cn.SetInterested(i, msg) {
353 for r := range cn.requests {
354 if _, ok := rs[r]; !ok {
355 delete(cn.requests, r)
356 // log.Printf("%p: cancelling request: %v", cn, r)
368 if _, ok := cn.requests[r]; !ok {
369 if cn.requests == nil {
370 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
372 cn.requests[r] = struct{}{}
373 // log.Printf("%p: requesting %v", cn, r)
386 // Writes buffers to the socket from the write channel.
387 func (cn *connection) writer(keepAliveTimeout time.Duration) {
390 lastWrite time.Time = time.Now()
392 var keepAliveTimer *time.Timer
393 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
395 defer cn.mu().Unlock()
396 if time.Since(lastWrite) >= keepAliveTimeout {
397 cn.writerCond.Broadcast()
399 keepAliveTimer.Reset(keepAliveTimeout)
402 defer cn.mu().Unlock()
404 defer keepAliveTimer.Stop()
406 buf.Write(cn.postedBuffer.Bytes())
407 cn.postedBuffer.Reset()
409 cn.fillWriteBuffer(func(msg pp.Message) bool {
410 buf.Write(msg.MustMarshalBinary())
411 return buf.Len() < 1<<16
414 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
415 buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
416 postedKeepalives.Add(1)
423 // log.Printf("writing %d bytes", buf.Len())
424 n, err := cn.w.Write(buf.Bytes())
427 lastWrite = time.Now()
428 keepAliveTimer.Reset(keepAliveTimeout)
440 func (cn *connection) Have(piece int) {
441 for piece >= len(cn.sentHaves) {
442 cn.sentHaves = append(cn.sentHaves, false)
444 if cn.sentHaves[piece] {
449 Index: pp.Integer(piece),
451 cn.sentHaves[piece] = true
454 func (cn *connection) Bitfield(haves []bool) {
455 if cn.sentHaves != nil {
456 panic("bitfield must be first have-related message sent")
462 // Make a copy of haves, as that's read when the message is marshalled
463 // without the lock. Also it obviously shouldn't change in the Msg due to
464 // changes in .sentHaves.
465 cn.sentHaves = append([]bool(nil), haves...)
468 func nextRequestState(
469 networkingEnabled bool,
470 currentRequests map[request]struct{},
472 nextPieces *prioritybitmap.PriorityBitmap,
473 pendingChunks func(piece int, f func(chunkSpec) bool) bool,
474 requestsLowWater int,
475 requestsHighWater int,
477 requests map[request]struct{},
480 if !networkingEnabled || nextPieces.IsEmpty() {
483 if peerChoking || len(currentRequests) > requestsLowWater {
484 return currentRequests, true
486 requests = make(map[request]struct{}, requestsHighWater)
487 for r := range currentRequests {
488 requests[r] = struct{}{}
490 nextPieces.IterTyped(func(piece int) bool {
491 return pendingChunks(piece, func(cs chunkSpec) bool {
492 if len(requests) >= requestsHighWater {
495 r := request{pp.Integer(piece), cs}
496 requests[r] = struct{}{}
500 return requests, true
503 func (cn *connection) updateRequests() {
504 cn.writerCond.Broadcast()
507 func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
508 return nextRequestState(
509 cn.t.networkingEnabled,
512 &cn.pieceRequestOrder,
513 func(piece int, f func(chunkSpec) bool) bool {
514 return undirtiedChunks(piece, cn.t, f)
517 cn.nominalMaxRequests(),
521 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
522 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
523 return iter.ForPerm(len(chunkIndices), func(i int) bool {
524 return f(t.chunkIndexSpec(chunkIndices[i], piece))
528 func (cn *connection) stopRequestingPiece(piece int) {
529 cn.pieceRequestOrder.Remove(piece)
530 cn.writerCond.Broadcast()
533 // This is distinct from Torrent piece priority, which is the user's
534 // preference. Connection piece priority is specific to a connection,
535 // pseudorandomly avoids connections always requesting the same pieces and
536 // thus wasting effort.
537 func (cn *connection) updatePiecePriority(piece int) {
538 tpp := cn.t.piecePriority(piece)
539 if !cn.PeerHasPiece(piece) {
540 tpp = PiecePriorityNone
542 if tpp == PiecePriorityNone {
543 cn.stopRequestingPiece(piece)
546 prio := cn.getPieceInclination()[piece]
548 case PiecePriorityNormal:
549 case PiecePriorityReadahead:
550 prio -= cn.t.numPieces()
551 case PiecePriorityNext, PiecePriorityNow:
552 prio -= 2 * cn.t.numPieces()
557 cn.pieceRequestOrder.Set(piece, prio)
561 func (cn *connection) getPieceInclination() []int {
562 if cn.pieceInclination == nil {
563 cn.pieceInclination = cn.t.getConnPieceInclination()
565 return cn.pieceInclination
568 func (cn *connection) discardPieceInclination() {
569 if cn.pieceInclination == nil {
572 cn.t.putPieceInclination(cn.pieceInclination)
573 cn.pieceInclination = nil
576 func (cn *connection) peerHasPieceChanged(piece int) {
577 cn.updatePiecePriority(piece)
580 func (cn *connection) peerPiecesChanged() {
582 for i := range iter.N(cn.t.numPieces()) {
583 cn.peerHasPieceChanged(i)
588 func (cn *connection) raisePeerMinPieces(newMin int) {
589 if newMin > cn.peerMinPieces {
590 cn.peerMinPieces = newMin
594 func (cn *connection) peerSentHave(piece int) error {
595 if cn.t.haveInfo() && piece >= cn.t.numPieces() {
596 return errors.New("invalid piece")
598 if cn.PeerHasPiece(piece) {
601 cn.raisePeerMinPieces(piece + 1)
602 cn.peerPieces.Set(piece, true)
603 cn.peerHasPieceChanged(piece)
607 func (cn *connection) peerSentBitfield(bf []bool) error {
608 cn.peerHasAll = false
610 panic("expected bitfield length divisible by 8")
612 // We know that the last byte means that at most the last 7 bits are
614 cn.raisePeerMinPieces(len(bf) - 7)
615 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
616 // Ignore known excess pieces.
617 bf = bf[:cn.t.numPieces()]
619 for i, have := range bf {
621 cn.raisePeerMinPieces(i + 1)
623 cn.peerPieces.Set(i, have)
625 cn.peerPiecesChanged()
629 func (cn *connection) peerSentHaveAll() error {
631 cn.peerPieces.Clear()
632 cn.peerPiecesChanged()
636 func (cn *connection) peerSentHaveNone() error {
637 cn.peerPieces.Clear()
638 cn.peerHasAll = false
639 cn.peerPiecesChanged()
643 func (c *connection) requestPendingMetadata() {
647 if c.PeerExtensionIDs["ut_metadata"] == 0 {
648 // Peer doesn't support this.
651 // Request metadata pieces that we don't have in a random order.
653 for index := 0; index < c.t.metadataPieceCount(); index++ {
654 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
655 pending = append(pending, index)
658 for _, i := range rand.Perm(len(pending)) {
659 c.requestMetadataPiece(pending[i])
663 func (cn *connection) wroteMsg(msg *pp.Message) {
664 cn.stats.wroteMsg(msg)
665 cn.t.stats.wroteMsg(msg)
668 func (cn *connection) readMsg(msg *pp.Message) {
669 cn.stats.readMsg(msg)
670 cn.t.stats.readMsg(msg)
673 func (cn *connection) wroteBytes(n int64) {
674 cn.stats.wroteBytes(n)
676 cn.t.stats.wroteBytes(n)
680 func (cn *connection) readBytes(n int64) {
681 cn.stats.readBytes(n)
683 cn.t.stats.readBytes(n)
687 // Returns whether the connection is currently useful to us. We're seeding and
688 // they want data, we don't have metainfo and they can provide it, etc.
689 func (c *connection) useful() bool {
691 if c.closed.IsSet() {
695 return c.supportsExtension("ut_metadata")
698 return c.PeerInterested
700 return c.peerHasWantedPieces()
703 func (c *connection) lastHelpful() (ret time.Time) {
704 ret = c.lastUsefulChunkReceived
705 if c.t.seeding() && c.lastChunkSent.After(ret) {
706 ret = c.lastChunkSent
711 // Processes incoming bittorrent messages. The client lock is held upon entry
712 // and exit. Returning will end the connection.
713 func (c *connection) mainReadLoop() error {
717 decoder := pp.Decoder{
718 R: bufio.NewReader(c.r),
719 MaxLength: 256 * 1024,
730 err = decoder.Decode(&msg)
732 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
739 c.lastMessageReceived = time.Now()
741 receivedKeepalives.Add(1)
744 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
749 // We can then reset our interest.
752 cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
756 c.writerCond.Broadcast()
758 c.PeerInterested = true
760 case pp.NotInterested:
761 c.PeerInterested = false
764 err = c.peerSentHave(int(msg.Index))
769 if !c.PeerInterested {
770 err = errors.New("peer sent request but isn't interested")
773 if !t.havePiece(msg.Index.Int()) {
774 // This isn't necessarily them screwing up. We can drop pieces
775 // from our storage, and can't communicate this to peers
776 // except by reconnecting.
777 requestsReceivedForMissingPieces.Add(1)
778 err = errors.New("peer requested piece we don't have")
781 if c.PeerRequests == nil {
782 c.PeerRequests = make(map[request]struct{}, maxRequests)
784 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
787 req := newRequest(msg.Index, msg.Begin, msg.Length)
788 if !c.PeerCancel(req) {
789 unexpectedCancels.Add(1)
792 err = c.peerSentBitfield(msg.Bitfield)
794 err = c.peerSentHaveAll()
796 err = c.peerSentHaveNone()
799 if len(msg.Piece) == int(t.chunkSize) {
800 t.chunkPool.Put(msg.Piece)
803 switch msg.ExtendedID {
804 case pp.HandshakeExtendedID:
805 // TODO: Create a bencode struct for this.
806 var d map[string]interface{}
807 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
809 err = fmt.Errorf("error decoding extended message payload: %s", err)
812 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
813 if reqq, ok := d["reqq"]; ok {
814 if i, ok := reqq.(int64); ok {
815 c.PeerMaxRequests = int(i)
818 if v, ok := d["v"]; ok {
819 c.PeerClientName = v.(string)
823 err = errors.New("handshake missing m item")
826 mTyped, ok := m.(map[string]interface{})
828 err = errors.New("handshake m value is not dict")
831 if c.PeerExtensionIDs == nil {
832 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
834 for name, v := range mTyped {
837 log.Printf("bad handshake m item extension ID type: %T", v)
841 delete(c.PeerExtensionIDs, name)
843 if c.PeerExtensionIDs[name] == 0 {
844 supportedExtensionMessages.Add(name, 1)
846 c.PeerExtensionIDs[name] = byte(id)
849 metadata_sizeUntyped, ok := d["metadata_size"]
851 metadata_size, ok := metadata_sizeUntyped.(int64)
853 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
855 err = t.setMetadataSize(metadata_size)
857 err = fmt.Errorf("error setting metadata size to %d", metadata_size)
862 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
863 c.requestPendingMetadata()
865 case metadataExtendedId:
866 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
868 err = fmt.Errorf("error handling metadata extension message: %s", err)
871 if cl.config.DisablePEX {
874 var pexMsg peerExchangeMessage
875 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
877 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
882 t.addPeers(func() (ret []Peer) {
883 for i, cp := range pexMsg.Added {
887 Source: peerSourcePEX,
889 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
890 p.SupportsEncryption = true
892 missinggo.CopyExact(p.IP, cp.IP[:])
900 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
903 // That client uses its own extension IDs for outgoing message
904 // types, which is incorrect.
905 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
906 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
914 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
919 pingAddr.Port = int(msg.Port)
921 go cl.dHT.Ping(pingAddr, nil)
923 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
931 // Set both the Reader and Writer for the connection from a single ReadWriter.
932 func (cn *connection) setRW(rw io.ReadWriter) {
937 // Returns the Reader and Writer as a combined ReadWriter.
938 func (cn *connection) rw() io.ReadWriter {
945 // Handle a received chunk from a peer.
946 func (c *connection) receiveChunk(msg *pp.Message) {
949 chunksReceived.Add(1)
951 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
953 // Request has been satisfied.
954 if cl.connDeleteRequest(t, c, req) {
955 defer c.updateRequests()
957 unexpectedChunksReceived.Add(1)
960 // Do we actually want this chunk?
961 if !t.wantPiece(req) {
962 unwantedChunksReceived.Add(1)
963 c.UnwantedChunksReceived++
967 index := int(req.Index)
968 piece := &t.pieces[index]
970 c.UsefulChunksReceived++
971 c.lastUsefulChunkReceived = time.Now()
975 // Need to record that it hasn't been written yet, before we attempt to do
977 piece.incrementPendingWrites()
978 // Record that we have the chunk.
979 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
981 // Cancel pending requests for this chunk.
982 for c := range t.conns {
987 // Write the chunk out. Note that the upper bound on chunk writing
988 // concurrency will be the number of connections.
989 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
992 piece.decrementPendingWrites()
995 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
997 t.updatePieceCompletion(int(msg.Index))
1001 // It's important that the piece is potentially queued before we check if
1002 // the piece is still wanted, because if it is queued, it won't be wanted.
1003 if t.pieceAllDirty(index) {
1004 t.queuePieceCheck(int(req.Index))
1007 if c.peerTouchedPieces == nil {
1008 c.peerTouchedPieces = make(map[int]struct{})
1010 c.peerTouchedPieces[index] = struct{}{}
1012 cl.event.Broadcast()
1013 t.publishPieceChange(int(req.Index))
1017 // Also handles choking and unchoking of the remote peer.
1018 func (c *connection) upload() {
1021 if cl.config.NoUpload {
1024 if !c.PeerInterested {
1027 seeding := t.seeding()
1028 if !seeding && !c.peerHasWantedPieces() {
1029 // There's no reason to upload to this peer.
1032 // Breaking or completing this loop means we don't want to upload to the
1033 // peer anymore, and we choke them.
1035 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1036 // We want to upload to the peer.
1038 for r := range c.PeerRequests {
1039 res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1040 delay := res.Delay()
1046 defer cl.mu.Unlock()
1051 err := cl.sendChunk(t, c, r)
1054 if t.pieceComplete(i) {
1055 t.updatePieceCompletion(i)
1056 if !t.pieceComplete(i) {
1057 // We had the piece, but not anymore.
1061 log.Printf("error sending chunk %+v to peer: %s", r, err)
1062 // If we failed to send a chunk, choke the peer to ensure they
1063 // flush all their requests. We've probably dropped a piece,
1064 // but there's no way to communicate this to the peer. If they
1065 // ask for it again, we'll kick them to allow us to send them
1066 // an updated bitfield.
1069 delete(c.PeerRequests, r)
1077 func (cn *connection) Drop() {
1078 cn.t.dropConnection(cn)
1081 func (cn *connection) netGoodPiecesDirtied() int {
1082 return cn.goodPiecesDirtied - cn.badPiecesDirtied
1085 func (c *connection) peerHasWantedPieces() bool {
1086 return !c.pieceRequestOrder.IsEmpty()
1089 func (c *connection) numLocalRequests() int {
1090 return len(c.requests)