17 "github.com/anacrolix/missinggo"
18 "github.com/anacrolix/missinggo/bitmap"
19 "github.com/anacrolix/missinggo/iter"
20 "github.com/anacrolix/missinggo/prioritybitmap"
22 "github.com/anacrolix/torrent/bencode"
23 pp "github.com/anacrolix/torrent/peer_protocol"
26 type peerSource string
29 peerSourceTracker = "T" // It's the default.
30 peerSourceIncoming = "I"
31 peerSourceDHTGetPeers = "Hg"
32 peerSourceDHTAnnouncePeer = "Ha"
36 // Maintains the state of a connection with a peer.
37 type connection struct {
39 // The actual Conn, used for closing, and setting socket options.
41 // The Reader and Writer for this Conn, with hooks installed for stats,
42 // limiting, deadlines etc.
45 // True if the connection is operating over MSE obfuscation.
49 closed missinggo.Event
52 UnwantedChunksReceived int
53 UsefulChunksReceived int
58 lastMessageReceived time.Time
59 completedHandshake time.Time
60 lastUsefulChunkReceived time.Time
61 lastChunkSent time.Time
63 // Stuff controlled by the local peer.
66 requests map[request]struct{}
68 // Indexed by metadata piece, set to true if posted and pending a
70 metadataRequests []bool
73 // Stuff controlled by the remote peer.
77 PeerRequests map[request]struct{}
78 PeerExtensionBytes peerExtensionBytes
79 // The pieces the peer has claimed to have.
80 peerPieces bitmap.Bitmap
81 // The peer has everything. This can occur due to a special message, when
82 // we may not even know the number of pieces in the torrent yet.
84 // The highest possible number of pieces the torrent could have based on
85 // communication with the peer. Generally only useful until we have the
88 // Pieces we've accepted chunks for from the peer.
89 peerTouchedPieces map[int]struct{}
91 PeerMaxRequests int // Maximum pending requests the peer allows.
92 PeerExtensionIDs map[string]byte
95 pieceInclination []int
96 pieceRequestOrder prioritybitmap.PriorityBitmap
98 postedBuffer bytes.Buffer
102 func (cn *connection) mu() sync.Locker {
106 func (cn *connection) remoteAddr() net.Addr {
107 return cn.conn.RemoteAddr()
110 func (cn *connection) localAddr() net.Addr {
111 return cn.conn.LocalAddr()
114 func (cn *connection) supportsExtension(ext string) bool {
115 _, ok := cn.PeerExtensionIDs[ext]
119 // The best guess at number of pieces in the torrent for this peer.
120 func (cn *connection) bestPeerNumPieces() int {
122 return cn.t.numPieces()
124 return cn.peerMinPieces
127 func (cn *connection) completedString() string {
128 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
131 // Correct the PeerPieces slice length. Return false if the existing slice is
132 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
134 func (cn *connection) setNumPieces(num int) error {
135 cn.peerPieces.RemoveRange(num, -1)
136 cn.peerPiecesChanged()
140 func eventAgeString(t time.Time) string {
144 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
147 func (cn *connection) connectionFlags() (ret string) {
149 ret += string([]byte{b})
154 ret += string(cn.Discovery)
161 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
162 func (cn *connection) statusFlags() (ret string) {
164 ret += string([]byte{b})
173 ret += cn.connectionFlags()
175 if cn.PeerInterested {
184 func (cn *connection) String() string {
186 cn.WriteStatus(&buf, nil)
190 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
191 // \t isn't preserved in <pre> blocks?
192 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
193 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
194 eventAgeString(cn.lastMessageReceived),
195 eventAgeString(cn.completedHandshake),
196 eventAgeString(cn.lastUsefulChunkReceived))
198 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
199 cn.completedString(),
200 len(cn.peerTouchedPieces),
201 cn.UsefulChunksReceived,
202 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
204 cn.numLocalRequests(),
205 len(cn.PeerRequests),
208 fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
211 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
212 pb.IterTyped(func(i int) bool {
222 func (cn *connection) Close() {
224 cn.discardPieceInclination()
225 cn.pieceRequestOrder.Clear()
227 // TODO: This call blocks sometimes, why?
232 func (cn *connection) PeerHasPiece(piece int) bool {
233 return cn.peerHasAll || cn.peerPieces.Contains(piece)
236 func (cn *connection) Post(msg pp.Message) {
237 messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
238 cn.postedBuffer.Write(msg.MustMarshalBinary())
239 cn.writerCond.Broadcast()
242 func (cn *connection) RequestPending(r request) bool {
243 _, ok := cn.requests[r]
247 func (cn *connection) requestMetadataPiece(index int) {
248 eID := cn.PeerExtensionIDs["ut_metadata"]
252 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
258 ExtendedPayload: func() []byte {
259 b, err := bencode.Marshal(map[string]int{
260 "msg_type": pp.RequestMetadataExtensionMsgType,
269 for index >= len(cn.metadataRequests) {
270 cn.metadataRequests = append(cn.metadataRequests, false)
272 cn.metadataRequests[index] = true
275 func (cn *connection) requestedMetadataPiece(index int) bool {
276 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
279 // The actual value to use as the maximum outbound requests.
280 func (cn *connection) nominalMaxRequests() (ret int) {
281 ret = cn.PeerMaxRequests
288 // Returns true if an unsatisfied request was canceled.
289 func (cn *connection) PeerCancel(r request) bool {
290 if cn.PeerRequests == nil {
293 if _, ok := cn.PeerRequests[r]; !ok {
296 delete(cn.PeerRequests, r)
300 func (cn *connection) Choke() {
307 cn.PeerRequests = nil
311 func (cn *connection) Unchoke() {
321 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
322 if cn.Interested == interested {
325 cn.Interested = interested
326 // log.Printf("%p: setting interest: %v", cn, interested)
327 return msg(pp.Message{
328 Type: func() pp.MessageType {
332 return pp.NotInterested
338 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
339 numFillBuffers.Add(1)
340 rs, i := cn.desiredRequestState()
341 if !cn.SetInterested(i, msg) {
345 for r := range cn.requests {
346 if _, ok := rs[r]; !ok {
348 delete(cn.requests, r)
349 // log.Printf("%p: cancelling request: %v", cn, r)
361 fillBufferSentCancels.Add(1)
363 sentRequests := false
365 if _, ok := cn.requests[r]; !ok {
366 if cn.requests == nil {
367 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
369 cn.requests[r] = struct{}{}
371 // log.Printf("%p: requesting %v", cn, r)
383 fillBufferSentRequests.Add(1)
387 // Writes buffers to the socket from the write channel.
388 func (cn *connection) writer(keepAliveTimeout time.Duration) {
391 lastWrite time.Time = time.Now()
393 var keepAliveTimer *time.Timer
394 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
396 defer cn.mu().Unlock()
397 if time.Since(lastWrite) >= keepAliveTimeout {
398 cn.writerCond.Broadcast()
400 keepAliveTimer.Reset(keepAliveTimeout)
403 defer cn.mu().Unlock()
405 defer keepAliveTimer.Stop()
407 buf.Write(cn.postedBuffer.Bytes())
408 cn.postedBuffer.Reset()
410 cn.fillWriteBuffer(func(msg pp.Message) bool {
412 buf.Write(msg.MustMarshalBinary())
413 return buf.Len() < 1<<16
416 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
417 buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
418 postedKeepalives.Add(1)
425 // log.Printf("writing %d bytes", buf.Len())
426 n, err := cn.w.Write(buf.Bytes())
429 lastWrite = time.Now()
430 keepAliveTimer.Reset(keepAliveTimeout)
442 func (cn *connection) Have(piece int) {
443 for piece >= len(cn.sentHaves) {
444 cn.sentHaves = append(cn.sentHaves, false)
446 if cn.sentHaves[piece] {
451 Index: pp.Integer(piece),
453 cn.sentHaves[piece] = true
456 func (cn *connection) Bitfield(haves []bool) {
457 if cn.sentHaves != nil {
458 panic("bitfield must be first have-related message sent")
464 // Make a copy of haves, as that's read when the message is marshalled
465 // without the lock. Also it obviously shouldn't change in the Msg due to
466 // changes in .sentHaves.
467 cn.sentHaves = append([]bool(nil), haves...)
470 func nextRequestState(
471 networkingEnabled bool,
472 currentRequests map[request]struct{},
474 nextPieces *prioritybitmap.PriorityBitmap,
475 pendingChunks func(piece int, f func(chunkSpec) bool) bool,
476 requestsLowWater int,
477 requestsHighWater int,
479 requests map[request]struct{},
482 if !networkingEnabled || nextPieces.IsEmpty() {
485 if peerChoking || len(currentRequests) > requestsLowWater {
486 return currentRequests, true
488 requests = make(map[request]struct{}, requestsHighWater)
489 for r := range currentRequests {
490 requests[r] = struct{}{}
492 nextPieces.IterTyped(func(piece int) bool {
493 return pendingChunks(piece, func(cs chunkSpec) bool {
494 if len(requests) >= requestsHighWater {
497 r := request{pp.Integer(piece), cs}
498 requests[r] = struct{}{}
502 return requests, true
505 func (cn *connection) updateRequests() {
506 cn.writerCond.Broadcast()
509 func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
510 return nextRequestState(
511 cn.t.networkingEnabled,
514 &cn.pieceRequestOrder,
515 func(piece int, f func(chunkSpec) bool) bool {
516 return undirtiedChunks(piece, cn.t, f)
519 cn.nominalMaxRequests(),
523 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
524 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
525 return iter.ForPerm(len(chunkIndices), func(i int) bool {
526 return f(t.chunkIndexSpec(chunkIndices[i], piece))
530 func (cn *connection) stopRequestingPiece(piece int) {
531 cn.pieceRequestOrder.Remove(piece)
532 cn.writerCond.Broadcast()
535 // This is distinct from Torrent piece priority, which is the user's
536 // preference. Connection piece priority is specific to a connection,
537 // pseudorandomly avoids connections always requesting the same pieces and
538 // thus wasting effort.
539 func (cn *connection) updatePiecePriority(piece int) {
540 tpp := cn.t.piecePriority(piece)
541 if !cn.PeerHasPiece(piece) {
542 tpp = PiecePriorityNone
544 if tpp == PiecePriorityNone {
545 cn.stopRequestingPiece(piece)
548 prio := cn.getPieceInclination()[piece]
550 case PiecePriorityNormal:
551 case PiecePriorityReadahead:
552 prio -= cn.t.numPieces()
553 case PiecePriorityNext, PiecePriorityNow:
554 prio -= 2 * cn.t.numPieces()
559 cn.pieceRequestOrder.Set(piece, prio)
563 func (cn *connection) getPieceInclination() []int {
564 if cn.pieceInclination == nil {
565 cn.pieceInclination = cn.t.getConnPieceInclination()
567 return cn.pieceInclination
570 func (cn *connection) discardPieceInclination() {
571 if cn.pieceInclination == nil {
574 cn.t.putPieceInclination(cn.pieceInclination)
575 cn.pieceInclination = nil
578 func (cn *connection) peerHasPieceChanged(piece int) {
579 cn.updatePiecePriority(piece)
582 func (cn *connection) peerPiecesChanged() {
584 for i := range iter.N(cn.t.numPieces()) {
585 cn.peerHasPieceChanged(i)
590 func (cn *connection) raisePeerMinPieces(newMin int) {
591 if newMin > cn.peerMinPieces {
592 cn.peerMinPieces = newMin
596 func (cn *connection) peerSentHave(piece int) error {
597 if cn.t.haveInfo() && piece >= cn.t.numPieces() {
598 return errors.New("invalid piece")
600 if cn.PeerHasPiece(piece) {
603 cn.raisePeerMinPieces(piece + 1)
604 cn.peerPieces.Set(piece, true)
605 cn.peerHasPieceChanged(piece)
609 func (cn *connection) peerSentBitfield(bf []bool) error {
610 cn.peerHasAll = false
612 panic("expected bitfield length divisible by 8")
614 // We know that the last byte means that at most the last 7 bits are
616 cn.raisePeerMinPieces(len(bf) - 7)
617 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
618 // Ignore known excess pieces.
619 bf = bf[:cn.t.numPieces()]
621 for i, have := range bf {
623 cn.raisePeerMinPieces(i + 1)
625 cn.peerPieces.Set(i, have)
627 cn.peerPiecesChanged()
631 func (cn *connection) peerSentHaveAll() error {
633 cn.peerPieces.Clear()
634 cn.peerPiecesChanged()
638 func (cn *connection) peerSentHaveNone() error {
639 cn.peerPieces.Clear()
640 cn.peerHasAll = false
641 cn.peerPiecesChanged()
645 func (c *connection) requestPendingMetadata() {
649 if c.PeerExtensionIDs["ut_metadata"] == 0 {
650 // Peer doesn't support this.
653 // Request metadata pieces that we don't have in a random order.
655 for index := 0; index < c.t.metadataPieceCount(); index++ {
656 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
657 pending = append(pending, index)
660 for _, i := range rand.Perm(len(pending)) {
661 c.requestMetadataPiece(pending[i])
665 func (cn *connection) wroteMsg(msg *pp.Message) {
666 messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
667 cn.stats.wroteMsg(msg)
668 cn.t.stats.wroteMsg(msg)
671 func (cn *connection) readMsg(msg *pp.Message) {
672 cn.stats.readMsg(msg)
673 cn.t.stats.readMsg(msg)
676 func (cn *connection) wroteBytes(n int64) {
677 cn.stats.wroteBytes(n)
679 cn.t.stats.wroteBytes(n)
683 func (cn *connection) readBytes(n int64) {
684 cn.stats.readBytes(n)
686 cn.t.stats.readBytes(n)
690 // Returns whether the connection is currently useful to us. We're seeding and
691 // they want data, we don't have metainfo and they can provide it, etc.
692 func (c *connection) useful() bool {
694 if c.closed.IsSet() {
698 return c.supportsExtension("ut_metadata")
701 return c.PeerInterested
703 return c.peerHasWantedPieces()
706 func (c *connection) lastHelpful() (ret time.Time) {
707 ret = c.lastUsefulChunkReceived
708 if c.t.seeding() && c.lastChunkSent.After(ret) {
709 ret = c.lastChunkSent
714 // Processes incoming bittorrent messages. The client lock is held upon entry
715 // and exit. Returning will end the connection.
716 func (c *connection) mainReadLoop() error {
720 decoder := pp.Decoder{
721 R: bufio.NewReader(c.r),
722 MaxLength: 256 * 1024,
733 err = decoder.Decode(&msg)
735 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
742 c.lastMessageReceived = time.Now()
744 receivedKeepalives.Add(1)
747 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
752 // We can then reset our interest.
755 cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
759 c.writerCond.Broadcast()
761 c.PeerInterested = true
763 case pp.NotInterested:
764 c.PeerInterested = false
767 err = c.peerSentHave(int(msg.Index))
772 if !c.PeerInterested {
773 err = errors.New("peer sent request but isn't interested")
776 if !t.havePiece(msg.Index.Int()) {
777 // This isn't necessarily them screwing up. We can drop pieces
778 // from our storage, and can't communicate this to peers
779 // except by reconnecting.
780 requestsReceivedForMissingPieces.Add(1)
781 err = errors.New("peer requested piece we don't have")
784 if c.PeerRequests == nil {
785 c.PeerRequests = make(map[request]struct{}, maxRequests)
787 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
790 req := newRequest(msg.Index, msg.Begin, msg.Length)
791 if !c.PeerCancel(req) {
792 unexpectedCancels.Add(1)
795 err = c.peerSentBitfield(msg.Bitfield)
797 err = c.peerSentHaveAll()
799 err = c.peerSentHaveNone()
802 if len(msg.Piece) == int(t.chunkSize) {
803 t.chunkPool.Put(msg.Piece)
806 switch msg.ExtendedID {
807 case pp.HandshakeExtendedID:
808 // TODO: Create a bencode struct for this.
809 var d map[string]interface{}
810 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
812 err = fmt.Errorf("error decoding extended message payload: %s", err)
815 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
816 if reqq, ok := d["reqq"]; ok {
817 if i, ok := reqq.(int64); ok {
818 c.PeerMaxRequests = int(i)
821 if v, ok := d["v"]; ok {
822 c.PeerClientName = v.(string)
826 err = errors.New("handshake missing m item")
829 mTyped, ok := m.(map[string]interface{})
831 err = errors.New("handshake m value is not dict")
834 if c.PeerExtensionIDs == nil {
835 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
837 for name, v := range mTyped {
840 log.Printf("bad handshake m item extension ID type: %T", v)
844 delete(c.PeerExtensionIDs, name)
846 if c.PeerExtensionIDs[name] == 0 {
847 supportedExtensionMessages.Add(name, 1)
849 c.PeerExtensionIDs[name] = byte(id)
852 metadata_sizeUntyped, ok := d["metadata_size"]
854 metadata_size, ok := metadata_sizeUntyped.(int64)
856 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
858 err = t.setMetadataSize(metadata_size)
860 err = fmt.Errorf("error setting metadata size to %d", metadata_size)
865 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
866 c.requestPendingMetadata()
868 case metadataExtendedId:
869 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
871 err = fmt.Errorf("error handling metadata extension message: %s", err)
874 if cl.config.DisablePEX {
877 var pexMsg peerExchangeMessage
878 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
880 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
885 t.addPeers(func() (ret []Peer) {
886 for i, cp := range pexMsg.Added {
890 Source: peerSourcePEX,
892 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
893 p.SupportsEncryption = true
895 missinggo.CopyExact(p.IP, cp.IP[:])
903 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
906 // That client uses its own extension IDs for outgoing message
907 // types, which is incorrect.
908 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
909 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
917 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
922 pingAddr.Port = int(msg.Port)
924 go cl.dHT.Ping(pingAddr, nil)
926 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
934 // Set both the Reader and Writer for the connection from a single ReadWriter.
935 func (cn *connection) setRW(rw io.ReadWriter) {
940 // Returns the Reader and Writer as a combined ReadWriter.
941 func (cn *connection) rw() io.ReadWriter {
948 // Handle a received chunk from a peer.
949 func (c *connection) receiveChunk(msg *pp.Message) {
952 chunksReceived.Add(1)
954 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
956 // Request has been satisfied.
957 if cl.connDeleteRequest(t, c, req) {
958 defer c.updateRequests()
960 unexpectedChunksReceived.Add(1)
963 // Do we actually want this chunk?
964 if !t.wantPiece(req) {
965 unwantedChunksReceived.Add(1)
966 c.UnwantedChunksReceived++
970 index := int(req.Index)
971 piece := &t.pieces[index]
973 c.UsefulChunksReceived++
974 c.lastUsefulChunkReceived = time.Now()
978 // Need to record that it hasn't been written yet, before we attempt to do
980 piece.incrementPendingWrites()
981 // Record that we have the chunk.
982 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
984 // Cancel pending requests for this chunk.
985 for c := range t.conns {
990 // Write the chunk out. Note that the upper bound on chunk writing
991 // concurrency will be the number of connections.
992 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
995 piece.decrementPendingWrites()
998 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
1000 t.updatePieceCompletion(int(msg.Index))
1004 // It's important that the piece is potentially queued before we check if
1005 // the piece is still wanted, because if it is queued, it won't be wanted.
1006 if t.pieceAllDirty(index) {
1007 t.queuePieceCheck(int(req.Index))
1010 if c.peerTouchedPieces == nil {
1011 c.peerTouchedPieces = make(map[int]struct{})
1013 c.peerTouchedPieces[index] = struct{}{}
1015 cl.event.Broadcast()
1016 t.publishPieceChange(int(req.Index))
1020 // Also handles choking and unchoking of the remote peer.
1021 func (c *connection) upload() {
1024 if cl.config.NoUpload {
1027 if !c.PeerInterested {
1030 seeding := t.seeding()
1031 if !seeding && !c.peerHasWantedPieces() {
1032 // There's no reason to upload to this peer.
1035 // Breaking or completing this loop means we don't want to upload to the
1036 // peer anymore, and we choke them.
1038 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1039 // We want to upload to the peer.
1041 for r := range c.PeerRequests {
1042 res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1043 delay := res.Delay()
1049 defer cl.mu.Unlock()
1054 err := cl.sendChunk(t, c, r)
1057 if t.pieceComplete(i) {
1058 t.updatePieceCompletion(i)
1059 if !t.pieceComplete(i) {
1060 // We had the piece, but not anymore.
1064 log.Printf("error sending chunk %+v to peer: %s", r, err)
1065 // If we failed to send a chunk, choke the peer to ensure they
1066 // flush all their requests. We've probably dropped a piece,
1067 // but there's no way to communicate this to the peer. If they
1068 // ask for it again, we'll kick them to allow us to send them
1069 // an updated bitfield.
1072 delete(c.PeerRequests, r)
1080 func (cn *connection) Drop() {
1081 cn.t.dropConnection(cn)
1084 func (cn *connection) netGoodPiecesDirtied() int {
1085 return cn.goodPiecesDirtied - cn.badPiecesDirtied
1088 func (c *connection) peerHasWantedPieces() bool {
1089 return !c.pieceRequestOrder.IsEmpty()
1092 func (c *connection) numLocalRequests() int {
1093 return len(c.requests)