17 "github.com/anacrolix/torrent/mse"
19 "github.com/anacrolix/missinggo"
20 "github.com/anacrolix/missinggo/bitmap"
21 "github.com/anacrolix/missinggo/iter"
22 "github.com/anacrolix/missinggo/prioritybitmap"
24 "github.com/anacrolix/torrent/bencode"
25 pp "github.com/anacrolix/torrent/peer_protocol"
28 type peerSource string
31 peerSourceTracker = "T" // It's the default.
32 peerSourceIncoming = "I"
33 peerSourceDHTGetPeers = "Hg"
34 peerSourceDHTAnnouncePeer = "Ha"
38 // Maintains the state of a connection with a peer.
39 type connection struct {
41 // The actual Conn, used for closing, and setting socket options.
43 // The Reader and Writer for this Conn, with hooks installed for stats,
44 // limiting, deadlines etc.
47 // True if the connection is operating over MSE obfuscation.
52 closed missinggo.Event
55 UnwantedChunksReceived int
56 UsefulChunksReceived int
61 lastMessageReceived time.Time
62 completedHandshake time.Time
63 lastUsefulChunkReceived time.Time
64 lastChunkSent time.Time
66 // Stuff controlled by the local peer.
69 requests map[request]struct{}
71 // Indexed by metadata piece, set to true if posted and pending a
73 metadataRequests []bool
76 // Stuff controlled by the remote peer.
80 PeerRequests map[request]struct{}
81 PeerExtensionBytes peerExtensionBytes
82 // The pieces the peer has claimed to have.
83 peerPieces bitmap.Bitmap
84 // The peer has everything. This can occur due to a special message, when
85 // we may not even know the number of pieces in the torrent yet.
87 // The highest possible number of pieces the torrent could have based on
88 // communication with the peer. Generally only useful until we have the
91 // Pieces we've accepted chunks for from the peer.
92 peerTouchedPieces map[int]struct{}
94 PeerMaxRequests int // Maximum pending requests the peer allows.
95 PeerExtensionIDs map[string]byte
98 pieceInclination []int
99 pieceRequestOrder prioritybitmap.PriorityBitmap
101 postedBuffer bytes.Buffer
102 uploadTimer *time.Timer
106 func (cn *connection) mu() sync.Locker {
110 func (cn *connection) remoteAddr() net.Addr {
111 return cn.conn.RemoteAddr()
114 func (cn *connection) localAddr() net.Addr {
115 return cn.conn.LocalAddr()
118 func (cn *connection) supportsExtension(ext string) bool {
119 _, ok := cn.PeerExtensionIDs[ext]
123 // The best guess at number of pieces in the torrent for this peer.
124 func (cn *connection) bestPeerNumPieces() int {
126 return cn.t.numPieces()
128 return cn.peerMinPieces
131 func (cn *connection) completedString() string {
132 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
135 // Correct the PeerPieces slice length. Return false if the existing slice is
136 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
138 func (cn *connection) setNumPieces(num int) error {
139 cn.peerPieces.RemoveRange(num, -1)
140 cn.peerPiecesChanged()
144 func eventAgeString(t time.Time) string {
148 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
151 func (cn *connection) connectionFlags() (ret string) {
153 ret += string([]byte{b})
155 if cn.cryptoMethod == mse.CryptoMethodRC4 {
157 } else if cn.headerEncrypted {
160 ret += string(cn.Discovery)
167 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
168 func (cn *connection) statusFlags() (ret string) {
170 ret += string([]byte{b})
179 ret += cn.connectionFlags()
181 if cn.PeerInterested {
190 func (cn *connection) String() string {
192 cn.WriteStatus(&buf, nil)
196 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
197 // \t isn't preserved in <pre> blocks?
198 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
199 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
200 eventAgeString(cn.lastMessageReceived),
201 eventAgeString(cn.completedHandshake),
202 eventAgeString(cn.lastUsefulChunkReceived))
204 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
205 cn.completedString(),
206 len(cn.peerTouchedPieces),
207 cn.UsefulChunksReceived,
208 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
210 cn.numLocalRequests(),
211 len(cn.PeerRequests),
214 fmt.Fprintf(w, " next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
217 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
218 pb.IterTyped(func(i int) bool {
228 func (cn *connection) Close() {
229 if !cn.closed.Set() {
232 cn.discardPieceInclination()
233 cn.pieceRequestOrder.Clear()
236 // TODO: This call blocks sometimes, why? Maybe it was the Go utp
238 err := cn.conn.Close()
240 log.Printf("error closing connection net.Conn: %s", err)
246 func (cn *connection) PeerHasPiece(piece int) bool {
247 return cn.peerHasAll || cn.peerPieces.Contains(piece)
250 func (cn *connection) Post(msg pp.Message) {
251 messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
252 cn.postedBuffer.Write(msg.MustMarshalBinary())
256 func (cn *connection) requestMetadataPiece(index int) {
257 eID := cn.PeerExtensionIDs["ut_metadata"]
261 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
267 ExtendedPayload: func() []byte {
268 b, err := bencode.Marshal(map[string]int{
269 "msg_type": pp.RequestMetadataExtensionMsgType,
278 for index >= len(cn.metadataRequests) {
279 cn.metadataRequests = append(cn.metadataRequests, false)
281 cn.metadataRequests[index] = true
284 func (cn *connection) requestedMetadataPiece(index int) bool {
285 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
288 // The actual value to use as the maximum outbound requests.
289 func (cn *connection) nominalMaxRequests() (ret int) {
290 ret = cn.PeerMaxRequests
297 // Returns true if an unsatisfied request was canceled.
298 func (cn *connection) PeerCancel(r request) bool {
299 if cn.PeerRequests == nil {
302 if _, ok := cn.PeerRequests[r]; !ok {
305 delete(cn.PeerRequests, r)
309 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
313 cn.PeerRequests = nil
315 return msg(pp.Message{
320 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
325 return msg(pp.Message{
330 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
331 if cn.Interested == interested {
334 cn.Interested = interested
335 // log.Printf("%p: setting interest: %v", cn, interested)
336 return msg(pp.Message{
337 Type: func() pp.MessageType {
341 return pp.NotInterested
347 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
348 numFillBuffers.Add(1)
349 cancel, new, i := cn.desiredRequestState()
350 if !cn.SetInterested(i, msg) {
353 if cancel && len(cn.requests) != 0 {
354 fillBufferSentCancels.Add(1)
355 for r := range cn.requests {
357 // log.Printf("%p: cancelling request: %v", cn, r)
369 fillBufferSentRequests.Add(1)
370 for _, r := range new {
371 if cn.requests == nil {
372 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
374 cn.requests[r] = struct{}{}
375 // log.Printf("%p: requesting %v", cn, r)
385 // If we didn't completely top up the requests, we shouldn't mark the
386 // low water, since we'll want to top up the requests as soon as we
387 // have more write buffer space.
388 cn.requestsLowWater = len(cn.requests) / 2
393 // Writes buffers to the socket from the write channel.
394 func (cn *connection) writer(keepAliveTimeout time.Duration) {
397 lastWrite time.Time = time.Now()
399 var keepAliveTimer *time.Timer
400 keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
402 defer cn.mu().Unlock()
403 if time.Since(lastWrite) >= keepAliveTimeout {
406 keepAliveTimer.Reset(keepAliveTimeout)
409 defer cn.mu().Unlock()
411 defer keepAliveTimer.Stop()
413 buf.Write(cn.postedBuffer.Bytes())
414 cn.postedBuffer.Reset()
416 cn.fillWriteBuffer(func(msg pp.Message) bool {
418 buf.Write(msg.MustMarshalBinary())
419 return buf.Len() < 1<<16
422 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
423 buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
424 postedKeepalives.Add(1)
431 // log.Printf("writing %d bytes", buf.Len())
432 n, err := cn.w.Write(buf.Bytes())
435 lastWrite = time.Now()
436 keepAliveTimer.Reset(keepAliveTimeout)
448 func (cn *connection) Have(piece int) {
449 for piece >= len(cn.sentHaves) {
450 cn.sentHaves = append(cn.sentHaves, false)
452 if cn.sentHaves[piece] {
457 Index: pp.Integer(piece),
459 cn.sentHaves[piece] = true
462 func (cn *connection) Bitfield(haves []bool) {
463 if cn.sentHaves != nil {
464 panic("bitfield must be first have-related message sent")
470 // Make a copy of haves, as that's read when the message is marshalled
471 // without the lock. Also it obviously shouldn't change in the Msg due to
472 // changes in .sentHaves.
473 cn.sentHaves = append([]bool(nil), haves...)
476 func nextRequestState(
477 networkingEnabled bool,
478 currentRequests map[request]struct{},
480 nextPieces *prioritybitmap.PriorityBitmap,
481 pendingChunks func(piece int, f func(chunkSpec) bool) bool,
482 requestsLowWater int,
483 requestsHighWater int,
486 newRequests []request,
489 if !networkingEnabled || nextPieces.IsEmpty() {
490 return true, nil, false
492 if peerChoking || len(currentRequests) > requestsLowWater {
493 return false, nil, !nextPieces.IsEmpty()
495 nextPieces.IterTyped(func(piece int) bool {
496 return pendingChunks(piece, func(cs chunkSpec) bool {
497 r := request{pp.Integer(piece), cs}
498 if _, ok := currentRequests[r]; !ok {
499 if newRequests == nil {
500 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
502 newRequests = append(newRequests, r)
504 return len(currentRequests)+len(newRequests) < requestsHighWater
507 return false, newRequests, true
510 func (cn *connection) updateRequests() {
514 func (cn *connection) desiredRequestState() (bool, []request, bool) {
515 return nextRequestState(
516 cn.t.networkingEnabled,
519 &cn.pieceRequestOrder,
520 func(piece int, f func(chunkSpec) bool) bool {
521 return undirtiedChunks(piece, cn.t, f)
524 cn.nominalMaxRequests(),
528 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
529 chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
530 return iter.ForPerm(len(chunkIndices), func(i int) bool {
531 return f(t.chunkIndexSpec(chunkIndices[i], piece))
535 // check callers updaterequests
536 func (cn *connection) stopRequestingPiece(piece int) bool {
537 return cn.pieceRequestOrder.Remove(piece)
540 // This is distinct from Torrent piece priority, which is the user's
541 // preference. Connection piece priority is specific to a connection,
542 // pseudorandomly avoids connections always requesting the same pieces and
543 // thus wasting effort.
544 func (cn *connection) updatePiecePriority(piece int) bool {
545 tpp := cn.t.piecePriority(piece)
546 if !cn.PeerHasPiece(piece) {
547 tpp = PiecePriorityNone
549 if tpp == PiecePriorityNone {
550 return cn.stopRequestingPiece(piece)
552 prio := cn.getPieceInclination()[piece]
554 case PiecePriorityNormal:
555 case PiecePriorityReadahead:
556 prio -= cn.t.numPieces()
557 case PiecePriorityNext, PiecePriorityNow:
558 prio -= 2 * cn.t.numPieces()
563 return cn.pieceRequestOrder.Set(piece, prio)
566 func (cn *connection) getPieceInclination() []int {
567 if cn.pieceInclination == nil {
568 cn.pieceInclination = cn.t.getConnPieceInclination()
570 return cn.pieceInclination
573 func (cn *connection) discardPieceInclination() {
574 if cn.pieceInclination == nil {
577 cn.t.putPieceInclination(cn.pieceInclination)
578 cn.pieceInclination = nil
581 func (cn *connection) peerPiecesChanged() {
583 prioritiesChanged := false
584 for i := range iter.N(cn.t.numPieces()) {
585 if cn.updatePiecePriority(i) {
586 prioritiesChanged = true
589 if prioritiesChanged {
595 func (cn *connection) raisePeerMinPieces(newMin int) {
596 if newMin > cn.peerMinPieces {
597 cn.peerMinPieces = newMin
601 func (cn *connection) peerSentHave(piece int) error {
602 if cn.t.haveInfo() && piece >= cn.t.numPieces() {
603 return errors.New("invalid piece")
605 if cn.PeerHasPiece(piece) {
608 cn.raisePeerMinPieces(piece + 1)
609 cn.peerPieces.Set(piece, true)
610 if cn.updatePiecePriority(piece) {
616 func (cn *connection) peerSentBitfield(bf []bool) error {
617 cn.peerHasAll = false
619 panic("expected bitfield length divisible by 8")
621 // We know that the last byte means that at most the last 7 bits are
623 cn.raisePeerMinPieces(len(bf) - 7)
624 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
625 // Ignore known excess pieces.
626 bf = bf[:cn.t.numPieces()]
628 for i, have := range bf {
630 cn.raisePeerMinPieces(i + 1)
632 cn.peerPieces.Set(i, have)
634 cn.peerPiecesChanged()
638 func (cn *connection) peerSentHaveAll() error {
640 cn.peerPieces.Clear()
641 cn.peerPiecesChanged()
645 func (cn *connection) peerSentHaveNone() error {
646 cn.peerPieces.Clear()
647 cn.peerHasAll = false
648 cn.peerPiecesChanged()
652 func (c *connection) requestPendingMetadata() {
656 if c.PeerExtensionIDs["ut_metadata"] == 0 {
657 // Peer doesn't support this.
660 // Request metadata pieces that we don't have in a random order.
662 for index := 0; index < c.t.metadataPieceCount(); index++ {
663 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
664 pending = append(pending, index)
667 for _, i := range rand.Perm(len(pending)) {
668 c.requestMetadataPiece(pending[i])
672 func (cn *connection) wroteMsg(msg *pp.Message) {
673 messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
674 cn.stats.wroteMsg(msg)
675 cn.t.stats.wroteMsg(msg)
678 func (cn *connection) readMsg(msg *pp.Message) {
679 cn.stats.readMsg(msg)
680 cn.t.stats.readMsg(msg)
683 func (cn *connection) wroteBytes(n int64) {
684 cn.stats.wroteBytes(n)
686 cn.t.stats.wroteBytes(n)
690 func (cn *connection) readBytes(n int64) {
691 cn.stats.readBytes(n)
693 cn.t.stats.readBytes(n)
697 // Returns whether the connection is currently useful to us. We're seeding and
698 // they want data, we don't have metainfo and they can provide it, etc.
699 func (c *connection) useful() bool {
701 if c.closed.IsSet() {
705 return c.supportsExtension("ut_metadata")
708 return c.PeerInterested
710 return c.peerHasWantedPieces()
713 func (c *connection) lastHelpful() (ret time.Time) {
714 ret = c.lastUsefulChunkReceived
715 if c.t.seeding() && c.lastChunkSent.After(ret) {
716 ret = c.lastChunkSent
721 // Processes incoming bittorrent messages. The client lock is held upon entry
722 // and exit. Returning will end the connection.
723 func (c *connection) mainReadLoop() error {
727 decoder := pp.Decoder{
728 R: bufio.NewReader(c.r),
729 MaxLength: 256 * 1024,
740 err = decoder.Decode(&msg)
742 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
749 c.lastMessageReceived = time.Now()
751 receivedKeepalives.Add(1)
754 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
759 // We can then reset our interest.
762 if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
769 c.PeerInterested = true
771 case pp.NotInterested:
772 c.PeerInterested = false
775 err = c.peerSentHave(int(msg.Index))
780 if len(c.PeerRequests) >= maxRequests {
783 if c.PeerRequests == nil {
784 c.PeerRequests = make(map[request]struct{}, maxRequests)
786 c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
789 req := newRequest(msg.Index, msg.Begin, msg.Length)
790 if !c.PeerCancel(req) {
791 unexpectedCancels.Add(1)
794 err = c.peerSentBitfield(msg.Bitfield)
796 err = c.peerSentHaveAll()
798 err = c.peerSentHaveNone()
801 if len(msg.Piece) == int(t.chunkSize) {
802 t.chunkPool.Put(msg.Piece)
805 switch msg.ExtendedID {
806 case pp.HandshakeExtendedID:
807 // TODO: Create a bencode struct for this.
808 var d map[string]interface{}
809 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
811 err = fmt.Errorf("error decoding extended message payload: %s", err)
814 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
815 if reqq, ok := d["reqq"]; ok {
816 if i, ok := reqq.(int64); ok {
817 c.PeerMaxRequests = int(i)
820 if v, ok := d["v"]; ok {
821 c.PeerClientName = v.(string)
825 err = errors.New("handshake missing m item")
828 mTyped, ok := m.(map[string]interface{})
830 err = errors.New("handshake m value is not dict")
833 if c.PeerExtensionIDs == nil {
834 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
836 for name, v := range mTyped {
839 log.Printf("bad handshake m item extension ID type: %T", v)
843 delete(c.PeerExtensionIDs, name)
845 if c.PeerExtensionIDs[name] == 0 {
846 supportedExtensionMessages.Add(name, 1)
848 c.PeerExtensionIDs[name] = byte(id)
851 metadata_sizeUntyped, ok := d["metadata_size"]
853 metadata_size, ok := metadata_sizeUntyped.(int64)
855 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
857 err = t.setMetadataSize(metadata_size)
859 err = fmt.Errorf("error setting metadata size to %d", metadata_size)
864 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
865 c.requestPendingMetadata()
867 case metadataExtendedId:
868 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
870 err = fmt.Errorf("error handling metadata extension message: %s", err)
873 if cl.config.DisablePEX {
876 var pexMsg peerExchangeMessage
877 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
879 err = fmt.Errorf("error unmarshalling PEX message: %s", err)
884 t.addPeers(func() (ret []Peer) {
885 for i, cp := range pexMsg.Added {
889 Source: peerSourcePEX,
891 if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
892 p.SupportsEncryption = true
894 missinggo.CopyExact(p.IP, cp.IP[:])
902 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
905 // That client uses its own extension IDs for outgoing message
906 // types, which is incorrect.
907 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
908 strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
916 pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
921 pingAddr.Port = int(msg.Port)
923 go cl.dHT.Ping(pingAddr, nil)
925 err = fmt.Errorf("received unknown message type: %#v", msg.Type)
933 // Set both the Reader and Writer for the connection from a single ReadWriter.
934 func (cn *connection) setRW(rw io.ReadWriter) {
939 // Returns the Reader and Writer as a combined ReadWriter.
940 func (cn *connection) rw() io.ReadWriter {
947 // Handle a received chunk from a peer.
948 func (c *connection) receiveChunk(msg *pp.Message) {
951 chunksReceived.Add(1)
953 req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
955 // Request has been satisfied.
956 if c.deleteRequest(req) {
959 unexpectedChunksReceived.Add(1)
962 // Do we actually want this chunk?
963 if !t.wantPiece(req) {
964 unwantedChunksReceived.Add(1)
965 c.UnwantedChunksReceived++
969 index := int(req.Index)
970 piece := &t.pieces[index]
972 c.UsefulChunksReceived++
973 c.lastUsefulChunkReceived = time.Now()
975 // Need to record that it hasn't been written yet, before we attempt to do
977 piece.incrementPendingWrites()
978 // Record that we have the chunk.
979 piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
981 // Cancel pending requests for this chunk.
982 for c := range t.conns {
987 // Write the chunk out. Note that the upper bound on chunk writing
988 // concurrency will be the number of connections.
989 err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
992 piece.decrementPendingWrites()
995 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
997 t.updatePieceCompletion(int(msg.Index))
1001 // It's important that the piece is potentially queued before we check if
1002 // the piece is still wanted, because if it is queued, it won't be wanted.
1003 if t.pieceAllDirty(index) {
1004 t.queuePieceCheck(int(req.Index))
1007 if c.peerTouchedPieces == nil {
1008 c.peerTouchedPieces = make(map[int]struct{})
1010 c.peerTouchedPieces[index] = struct{}{}
1012 cl.event.Broadcast()
1013 t.publishPieceChange(int(req.Index))
1017 // Also handles choking and unchoking of the remote peer.
1018 func (c *connection) upload(msg func(pp.Message) bool) bool {
1021 if cl.config.NoUpload {
1024 if !c.PeerInterested {
1027 seeding := t.seeding()
1028 if !seeding && !c.peerHasWantedPieces() {
1029 // There's no reason to upload to this peer.
1032 // Breaking or completing this loop means we don't want to upload to the
1033 // peer anymore, and we choke them.
1035 for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1036 // We want to upload to the peer.
1037 if !c.Unchoke(msg) {
1040 for r := range c.PeerRequests {
1041 res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1045 delay := res.Delay()
1048 if c.uploadTimer == nil {
1049 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1051 c.uploadTimer.Reset(delay)
1053 // Hard to say what to return here.
1056 more, err := cl.sendChunk(t, c, r, msg)
1059 if t.pieceComplete(i) {
1060 t.updatePieceCompletion(i)
1061 if !t.pieceComplete(i) {
1062 // We had the piece, but not anymore.
1066 log.Printf("error sending chunk %+v to peer: %s", r, err)
1067 // If we failed to send a chunk, choke the peer to ensure they
1068 // flush all their requests. We've probably dropped a piece,
1069 // but there's no way to communicate this to the peer. If they
1070 // ask for it again, we'll kick them to allow us to send them
1071 // an updated bitfield.
1074 delete(c.PeerRequests, r)
1085 func (cn *connection) Drop() {
1086 cn.t.dropConnection(cn)
1089 func (cn *connection) netGoodPiecesDirtied() int {
1090 return cn.goodPiecesDirtied - cn.badPiecesDirtied
1093 func (c *connection) peerHasWantedPieces() bool {
1094 return !c.pieceRequestOrder.IsEmpty()
1097 func (c *connection) numLocalRequests() int {
1098 return len(c.requests)
1101 func (c *connection) deleteRequest(r request) bool {
1102 if _, ok := c.requests[r]; !ok {
1105 delete(c.requests, r)
1108 func (c *connection) tickleWriter() {
1109 c.writerCond.Broadcast()