17 "github.com/anacrolix/torrent/mse"
19 "github.com/anacrolix/missinggo"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/iter"
22 "github.com/anacrolix/missinggo/prioritybitmap"
24 "github.com/anacrolix/torrent/bencode"
25 pp "github.com/anacrolix/torrent/peer_protocol"
28 type peerSource string
31 peerSourceTracker = "T" // It's the default.
32 peerSourceIncoming = "I"
33 peerSourceDHTGetPeers = "Hg"
34 peerSourceDHTAnnouncePeer = "Ha"
38 // Maintains the state of a connection with a peer.
39 type connection struct {
41 // The actual Conn, used for closing, and setting socket options.
43 // The Reader and Writer for this Conn, with hooks installed for stats,
44 // limiting, deadlines etc.
47 // True if the connection is operating over MSE obfuscation.
52 closed missinggo.Event
55 UnwantedChunksReceived int
56 UsefulChunksReceived int
61 lastMessageReceived time.Time
62 completedHandshake time.Time
63 lastUsefulChunkReceived time.Time
64 lastChunkSent time.Time
66 // Stuff controlled by the local peer.
69 requests map[request]struct{}
71 // Indexed by metadata piece, set to true if posted and pending a
73 metadataRequests []bool
76 // Stuff controlled by the remote peer.
80 PeerRequests map[request]struct{}
81 PeerExtensionBytes peerExtensionBytes
82 // The pieces the peer has claimed to have.
83 peerPieces bitmap.Bitmap
84 // The peer has everything. This can occur due to a special message, when
85 // we may not even know the number of pieces in the torrent yet.
87 // The highest possible number of pieces the torrent could have based on
88 // communication with the peer. Generally only useful until we have the
91 // Pieces we've accepted chunks for from the peer.
92 peerTouchedPieces map[int]struct{}
94 PeerMaxRequests int // Maximum pending requests the peer allows.
95 PeerExtensionIDs map[string]byte
98 pieceInclination []int
99 pieceRequestOrder prioritybitmap.PriorityBitmap
101 postedBuffer bytes.Buffer
105 func (cn *connection) mu() sync.Locker {
109 func (cn *connection) remoteAddr() net.Addr {
110 return cn.conn.RemoteAddr()
113 func (cn *connection) localAddr() net.Addr {
114 return cn.conn.LocalAddr()
117 func (cn *connection) supportsExtension(ext string) bool {
118 _, ok := cn.PeerExtensionIDs[ext]
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
125 return cn.t.numPieces()
127 return cn.peerMinPieces
130 func (cn *connection) completedString() string {
131 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
137 func (cn *connection) setNumPieces(num int) error {
138 cn.peerPieces.RemoveRange(num, -1)
139 cn.peerPiecesChanged()
143 func eventAgeString(t time.Time) string {
147 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
150 func (cn *connection) connectionFlags() (ret string) {
152 ret += string([]byte{b})
154 if cn.cryptoMethod == mse.CryptoMethodRC4 {
156 } else if cn.headerEncrypted {
159 ret += string(cn.Discovery)
166 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
167 func (cn *connection) statusFlags() (ret string) {
169 ret += string([]byte{b})
178 ret += cn.connectionFlags()
180 if cn.PeerInterested {
189 func (cn *connection) String() string {
191 cn.WriteStatus(&buf, nil)
195 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
196 // \t isn't preserved in <pre> blocks?
197 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
198 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
199 eventAgeString(cn.lastMessageReceived),
200 eventAgeString(cn.completedHandshake),
201 eventAgeString(cn.lastUsefulChunkReceived))
203 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
204 cn.completedString(),
205 len(cn.peerTouchedPieces),
206 cn.UsefulChunksReceived,
207 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
209 cn.numLocalRequests(),
210 len(cn.PeerRequests),
213 fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
216 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
217 pb.IterTyped(func(i int) bool {
227 func (cn *connection) Close() {
229 cn.discardPieceInclination()
230 cn.pieceRequestOrder.Clear()
232 // TODO: This call blocks sometimes, why?
237 func (cn *connection) PeerHasPiece(piece int) bool {
238 return cn.peerHasAll || cn.peerPieces.Contains(piece)
241 func (cn *connection) Post(msg pp.Message) {
242 messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
243 cn.postedBuffer.Write(msg.MustMarshalBinary())
247 func (cn *connection) requestMetadataPiece(index int) {
248 eID := cn.PeerExtensionIDs["ut_metadata"]
252 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
258 ExtendedPayload: func() []byte {
259 b, err := bencode.Marshal(map[string]int{
260 "msg_type": pp.RequestMetadataExtensionMsgType,
269 for index >= len(cn.metadataRequests) {
270 cn.metadataRequests = append(cn.metadataRequests, false)
272 cn.metadataRequests[index] = true
275 func (cn *connection) requestedMetadataPiece(index int) bool {
276 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
279 // The actual value to use as the maximum outbound requests.
280 func (cn *connection) nominalMaxRequests() (ret int) {
281 ret = cn.PeerMaxRequests
288 // Returns true if an unsatisfied request was canceled.
289 func (cn *connection) PeerCancel(r request) bool {
290 if cn.PeerRequests == nil {
293 if _, ok := cn.PeerRequests[r]; !ok {
296 delete(cn.PeerRequests, r)
300 func (cn *connection) Choke() {
307 cn.PeerRequests = nil
311 func (cn *connection) Unchoke() {
321 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
322 if cn.Interested == interested {
325 cn.Interested = interested
326 // log.Printf("%p: setting interest: %v", cn, interested)
327 return msg(pp.Message{
328 Type: func() pp.MessageType {
332 return pp.NotInterested
338 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
339 numFillBuffers.Add(1)
340 cancel, new, i := cn.desiredRequestState()
341 if !cn.SetInterested(i, msg) {
344 if cancel && len(cn.requests) != 0 {
345 fillBufferSentCancels.Add(1)
346 for r := range cn.requests {
348 // log.Printf("%p: cancelling request: %v", cn, r)
360 fillBufferSentRequests.Add(1)
361 for _, r := range new {
362 if cn.requests == nil {
363 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
365 cn.requests[r] = struct{}{}
366 // log.Printf("%p: requesting %v", cn, r)
376 // If we didn't completely top up the requests, we shouldn't mark the
377 // low water, since we'll want to top up the requests as soon as we
378 // have more write buffer space.
379 cn.requestsLowWater = len(cn.requests) / 2
383 // Writes buffers to the socket from the write channel.
384 func (cn *connection) writer(keepAliveTimeout time.Duration) {
387 lastWrite time.Time = time.Now()
389 var keepAliveTimer *time.Timer
390 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
392 defer cn.mu().Unlock()
393 if time.Since(lastWrite) >= keepAliveTimeout {
396 keepAliveTimer.Reset(keepAliveTimeout)
399 defer cn.mu().Unlock()
401 defer keepAliveTimer.Stop()
403 buf.Write(cn.postedBuffer.Bytes())
404 cn.postedBuffer.Reset()
406 cn.fillWriteBuffer(func(msg pp.Message) bool {
408 buf.Write(msg.MustMarshalBinary())
409 return buf.Len() < 1<<16
412 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
413 buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
414 postedKeepalives.Add(1)
421 // log.Printf("writing %d bytes", buf.Len())
422 n, err := cn.w.Write(buf.Bytes())
425 lastWrite = time.Now()
426 keepAliveTimer.Reset(keepAliveTimeout)
438 func (cn *connection) Have(piece int) {
439 for piece >= len(cn.sentHaves) {
440 cn.sentHaves = append(cn.sentHaves, false)
442 if cn.sentHaves[piece] {
447 Index: pp.Integer(piece),
449 cn.sentHaves[piece] = true
452 func (cn *connection) Bitfield(haves []bool) {
453 if cn.sentHaves != nil {
454 panic("bitfield must be first have-related message sent")
460 // Make a copy of haves, as that's read when the message is marshalled
461 // without the lock. Also it obviously shouldn't change in the Msg due to
462 // changes in .sentHaves.
463 cn.sentHaves = append([]bool(nil), haves...)
466 func nextRequestState(
467 networkingEnabled bool,
468 currentRequests map[request]struct{},
470 nextPieces *prioritybitmap.PriorityBitmap,
471 pendingChunks func(piece int, f func(chunkSpec) bool) bool,
472 requestsLowWater int,
473 requestsHighWater int,
476 newRequests []request,
479 if !networkingEnabled || nextPieces.IsEmpty() {
480 return true, nil, false
482 if peerChoking || len(currentRequests) > requestsLowWater {
483 return false, nil, !nextPieces.IsEmpty()
485 nextPieces.IterTyped(func(piece int) bool {
486 return pendingChunks(piece, func(cs chunkSpec) bool {
487 r := request{pp.Integer(piece), cs}
488 if _, ok := currentRequests[r]; !ok {
489 if newRequests == nil {
490 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
492 newRequests = append(newRequests, r)
494 return len(currentRequests)+len(newRequests) < requestsHighWater
497 return false, newRequests, true
500 func (cn *connection) updateRequests() {
504 func (cn *connection) desiredRequestState() (bool, []request, bool) {
505 return nextRequestState(
506 cn.t.networkingEnabled,
509 &cn.pieceRequestOrder,
510 func(piece int, f func(chunkSpec) bool) bool {
511 return undirtiedChunks(piece, cn.t, f)
514 cn.nominalMaxRequests(),
518 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
519 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
520 return iter.ForPerm(len(chunkIndices), func(i int) bool {
521 return f(t.chunkIndexSpec(chunkIndices[i], piece))
525 // check callers updaterequests
526 func (cn *connection) stopRequestingPiece(piece int) bool {
527 return cn.pieceRequestOrder.Remove(piece)
530 // This is distinct from Torrent piece priority, which is the user's
531 // preference. Connection piece priority is specific to a connection,
532 // pseudorandomly avoids connections always requesting the same pieces and
533 // thus wasting effort.
534 func (cn *connection) updatePiecePriority(piece int) bool {
535 tpp := cn.t.piecePriority(piece)
536 if !cn.PeerHasPiece(piece) {
537 tpp = PiecePriorityNone
539 if tpp == PiecePriorityNone {
540 return cn.stopRequestingPiece(piece)
542 prio := cn.getPieceInclination()[piece]
544 case PiecePriorityNormal:
545 case PiecePriorityReadahead:
546 prio -= cn.t.numPieces()
547 case PiecePriorityNext, PiecePriorityNow:
548 prio -= 2 * cn.t.numPieces()
553 return cn.pieceRequestOrder.Set(piece, prio)
556 func (cn *connection) getPieceInclination() []int {
557 if cn.pieceInclination == nil {
558 cn.pieceInclination = cn.t.getConnPieceInclination()
560 return cn.pieceInclination
563 func (cn *connection) discardPieceInclination() {
564 if cn.pieceInclination == nil {
567 cn.t.putPieceInclination(cn.pieceInclination)
568 cn.pieceInclination = nil
571 func (cn *connection) peerPiecesChanged() {
573 prioritiesChanged := false
574 for i := range iter.N(cn.t.numPieces()) {
575 if cn.updatePiecePriority(i) {
576 prioritiesChanged = true
579 if prioritiesChanged {
585 func (cn *connection) raisePeerMinPieces(newMin int) {
586 if newMin > cn.peerMinPieces {
587 cn.peerMinPieces = newMin
591 func (cn *connection) peerSentHave(piece int) error {
592 if cn.t.haveInfo() && piece >= cn.t.numPieces() {
593 return errors.New("invalid piece")
595 if cn.PeerHasPiece(piece) {
598 cn.raisePeerMinPieces(piece + 1)
599 cn.peerPieces.Set(piece, true)
600 if cn.updatePiecePriority(piece) {
606 func (cn *connection) peerSentBitfield(bf []bool) error {
607 cn.peerHasAll = false
609 panic("expected bitfield length divisible by 8")
611 // We know that the last byte means that at most the last 7 bits are
613 cn.raisePeerMinPieces(len(bf) - 7)
614 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
615 // Ignore known excess pieces.
616 bf = bf[:cn.t.numPieces()]
618 for i, have := range bf {
620 cn.raisePeerMinPieces(i + 1)
622 cn.peerPieces.Set(i, have)
624 cn.peerPiecesChanged()
628 func (cn *connection) peerSentHaveAll() error {
630 cn.peerPieces.Clear()
631 cn.peerPiecesChanged()
635 func (cn *connection) peerSentHaveNone() error {
636 cn.peerPieces.Clear()
637 cn.peerHasAll = false
638 cn.peerPiecesChanged()
642 func (c *connection) requestPendingMetadata() {
646 if c.PeerExtensionIDs["ut_metadata"] == 0 {
647 // Peer doesn't support this.
650 // Request metadata pieces that we don't have in a random order.
652 for index := 0; index < c.t.metadataPieceCount(); index++ {
653 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
654 pending = append(pending, index)
657 for _, i := range rand.Perm(len(pending)) {
658 c.requestMetadataPiece(pending[i])
662 func (cn *connection) wroteMsg(msg *pp.Message) {
663 messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
664 cn.stats.wroteMsg(msg)
665 cn.t.stats.wroteMsg(msg)
668 func (cn *connection) readMsg(msg *pp.Message) {
669 cn.stats.readMsg(msg)
670 cn.t.stats.readMsg(msg)
673 func (cn *connection) wroteBytes(n int64) {
674 cn.stats.wroteBytes(n)
676 cn.t.stats.wroteBytes(n)
680 func (cn *connection) readBytes(n int64) {
681 cn.stats.readBytes(n)
683 cn.t.stats.readBytes(n)
687 // Returns whether the connection is currently useful to us. We're seeding and
688 // they want data, we don't have metainfo and they can provide it, etc.
689 func (c *connection) useful() bool {
691 if c.closed.IsSet() {
695 return c.supportsExtension("ut_metadata")
698 return c.PeerInterested
700 return c.peerHasWantedPieces()
703 func (c *connection) lastHelpful() (ret time.Time) {
704 ret = c.lastUsefulChunkReceived
705 if c.t.seeding() && c.lastChunkSent.After(ret) {
706 ret = c.lastChunkSent
711 // Processes incoming bittorrent messages. The client lock is held upon entry
712 // and exit. Returning will end the connection.
713 func (c *connection) mainReadLoop() error {
717 decoder := pp.Decoder{
718 R: bufio.NewReader(c.r),
719 MaxLength: 256 * 1024,
730 err = decoder.Decode(&msg)
732 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
739 c.lastMessageReceived = time.Now()
741 receivedKeepalives.Add(1)
744 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
749 // We can then reset our interest.
752 if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
759 c.PeerInterested = true
761 case pp.NotInterested:
762 c.PeerInterested = false
765 err = c.peerSentHave(int(msg.Index))
770 if !c.PeerInterested {
771 err = errors.New("peer sent request but isn't interested")
774 if !t.havePiece(msg.Index.Int()) {
775 // This isn't necessarily them screwing up. We can drop pieces
776 // from our storage, and can't communicate this to peers
777 // except by reconnecting.
778 requestsReceivedForMissingPieces.Add(1)
779 err = errors.New("peer requested piece we don't have")
782 if c.PeerRequests == nil {
783 c.PeerRequests = make(map[request]struct{}, maxRequests)
785 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
788 req := newRequest(msg.Index, msg.Begin, msg.Length)
789 if !c.PeerCancel(req) {
790 unexpectedCancels.Add(1)
793 err = c.peerSentBitfield(msg.Bitfield)
795 err = c.peerSentHaveAll()
797 err = c.peerSentHaveNone()
800 if len(msg.Piece) == int(t.chunkSize) {
801 t.chunkPool.Put(msg.Piece)
804 switch msg.ExtendedID {
805 case pp.HandshakeExtendedID:
806 // TODO: Create a bencode struct for this.
807 var d map[string]interface{}
808 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
810 err = fmt.Errorf("error decoding extended message payload: %s", err)
813 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
814 if reqq, ok := d["reqq"]; ok {
815 if i, ok := reqq.(int64); ok {
816 c.PeerMaxRequests = int(i)
819 if v, ok := d["v"]; ok {
820 c.PeerClientName = v.(string)
824 err = errors.New("handshake missing m item")
827 mTyped, ok := m.(map[string]interface{})
829 err = errors.New("handshake m value is not dict")
832 if c.PeerExtensionIDs == nil {
833 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
835 for name, v := range mTyped {
838 log.Printf("bad handshake m item extension ID type: %T", v)
842 delete(c.PeerExtensionIDs, name)
844 if c.PeerExtensionIDs[name] == 0 {
845 supportedExtensionMessages.Add(name, 1)
847 c.PeerExtensionIDs[name] = byte(id)
850 metadata_sizeUntyped, ok := d["metadata_size"]
852 metadata_size, ok := metadata_sizeUntyped.(int64)
854 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
856 err = t.setMetadataSize(metadata_size)
858 err = fmt.Errorf("error setting metadata size to %d", metadata_size)
863 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
864 c.requestPendingMetadata()
866 case metadataExtendedId:
867 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
869 err = fmt.Errorf("error handling metadata extension message: %s", err)
872 if cl.config.DisablePEX {
875 var pexMsg peerExchangeMessage
876 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
878 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
883 t.addPeers(func() (ret []Peer) {
884 for i, cp := range pexMsg.Added {
888 Source: peerSourcePEX,
890 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
891 p.SupportsEncryption = true
893 missinggo.CopyExact(p.IP, cp.IP[:])
901 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
904 // That client uses its own extension IDs for outgoing message
905 // types, which is incorrect.
906 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
907 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
915 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
920 pingAddr.Port = int(msg.Port)
922 go cl.dHT.Ping(pingAddr, nil)
924 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
932 // Set both the Reader and Writer for the connection from a single ReadWriter.
933 func (cn *connection) setRW(rw io.ReadWriter) {
938 // Returns the Reader and Writer as a combined ReadWriter.
939 func (cn *connection) rw() io.ReadWriter {
946 // Handle a received chunk from a peer.
947 func (c *connection) receiveChunk(msg *pp.Message) {
950 chunksReceived.Add(1)
952 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
954 // Request has been satisfied.
955 if c.deleteRequest(req) {
958 unexpectedChunksReceived.Add(1)
961 // Do we actually want this chunk?
962 if !t.wantPiece(req) {
963 unwantedChunksReceived.Add(1)
964 c.UnwantedChunksReceived++
968 index := int(req.Index)
969 piece := &t.pieces[index]
971 c.UsefulChunksReceived++
972 c.lastUsefulChunkReceived = time.Now()
976 // Need to record that it hasn't been written yet, before we attempt to do
978 piece.incrementPendingWrites()
979 // Record that we have the chunk.
980 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
982 // Cancel pending requests for this chunk.
983 for c := range t.conns {
988 // Write the chunk out. Note that the upper bound on chunk writing
989 // concurrency will be the number of connections.
990 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
993 piece.decrementPendingWrites()
996 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
998 t.updatePieceCompletion(int(msg.Index))
1002 // It's important that the piece is potentially queued before we check if
1003 // the piece is still wanted, because if it is queued, it won't be wanted.
1004 if t.pieceAllDirty(index) {
1005 t.queuePieceCheck(int(req.Index))
1008 if c.peerTouchedPieces == nil {
1009 c.peerTouchedPieces = make(map[int]struct{})
1011 c.peerTouchedPieces[index] = struct{}{}
1013 cl.event.Broadcast()
1014 t.publishPieceChange(int(req.Index))
1018 // Also handles choking and unchoking of the remote peer.
1019 func (c *connection) upload() {
1022 if cl.config.NoUpload {
1025 if !c.PeerInterested {
1028 seeding := t.seeding()
1029 if !seeding && !c.peerHasWantedPieces() {
1030 // There's no reason to upload to this peer.
1033 // Breaking or completing this loop means we don't want to upload to the
1034 // peer anymore, and we choke them.
1036 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1037 // We want to upload to the peer.
1039 for r := range c.PeerRequests {
1040 res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1041 delay := res.Delay()
1047 defer cl.mu.Unlock()
1052 err := cl.sendChunk(t, c, r)
1055 if t.pieceComplete(i) {
1056 t.updatePieceCompletion(i)
1057 if !t.pieceComplete(i) {
1058 // We had the piece, but not anymore.
1062 log.Printf("error sending chunk %+v to peer: %s", r, err)
1063 // If we failed to send a chunk, choke the peer to ensure they
1064 // flush all their requests. We've probably dropped a piece,
1065 // but there's no way to communicate this to the peer. If they
1066 // ask for it again, we'll kick them to allow us to send them
1067 // an updated bitfield.
1070 delete(c.PeerRequests, r)
1078 func (cn *connection) Drop() {
1079 cn.t.dropConnection(cn)
1082 func (cn *connection) netGoodPiecesDirtied() int {
1083 return cn.goodPiecesDirtied - cn.badPiecesDirtied
1086 func (c *connection) peerHasWantedPieces() bool {
1087 return !c.pieceRequestOrder.IsEmpty()
1090 func (c *connection) numLocalRequests() int {
1091 return len(c.requests)
1094 func (c *connection) deleteRequest(r request) bool {
1095 if _, ok := c.requests[r]; !ok {
1098 delete(c.requests, r)
1101 func (c *connection) tickleWriter() {
1102 c.writerCond.Broadcast()