]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn.go
Rework to improve work stealing and try to thread peers through all request pieces
[btrtrc.git] / peerconn.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/log"
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/iter"
19         "github.com/anacrolix/missinggo/v2/bitmap"
20         "github.com/anacrolix/missinggo/v2/prioritybitmap"
21         "github.com/anacrolix/multiless"
22         "github.com/anacrolix/torrent/metainfo"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/mse"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27 )
28
29 type PeerSource string
30
31 const (
32         PeerSourceTracker         = "Tr"
33         PeerSourceIncoming        = "I"
34         PeerSourceDhtGetPeers     = "Hg" // Peers we found by searching a DHT.
35         PeerSourceDhtAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
36         PeerSourcePex             = "X"
37         // The peer was given directly, such as through a magnet link.
38         PeerSourceDirect = "M"
39 )
40
41 type peerRequestState struct {
42         data []byte
43 }
44
45 type PeerRemoteAddr interface {
46         String() string
47 }
48
49 type Peer struct {
50         // First to ensure 64-bit alignment for atomics. See #262.
51         _stats ConnStats
52
53         t *Torrent
54
55         peerImpl
56         callbacks *Callbacks
57
58         outgoing   bool
59         Network    string
60         RemoteAddr PeerRemoteAddr
61         // True if the connection is operating over MSE obfuscation.
62         headerEncrypted bool
63         cryptoMethod    mse.CryptoMethod
64         Discovery       PeerSource
65         trusted         bool
66         closed          missinggo.Event
67         // Set true after we've added our ConnStats generated during handshake to
68         // other ConnStat instances as determined when the *Torrent became known.
69         reconciledHandshakeStats bool
70
71         lastMessageReceived     time.Time
72         completedHandshake      time.Time
73         lastUsefulChunkReceived time.Time
74         lastChunkSent           time.Time
75
76         // Stuff controlled by the local peer.
77         interested           bool
78         lastBecameInterested time.Time
79         priorInterest        time.Duration
80
81         lastStartedExpectingToReceiveChunks time.Time
82         cumulativeExpectedToReceiveChunks   time.Duration
83         _chunksReceivedWhileExpecting       int64
84
85         choking          bool
86         requests         map[Request]struct{}
87         requestsLowWater int
88         // Chunks that we might reasonably expect to receive from the peer. Due to
89         // latency, buffering, and implementation differences, we may receive
90         // chunks that are no longer in the set of requests actually want.
91         validReceiveChunks map[Request]int
92         // Indexed by metadata piece, set to true if posted and pending a
93         // response.
94         metadataRequests []bool
95         sentHaves        bitmap.Bitmap
96
97         // Stuff controlled by the remote peer.
98         peerInterested        bool
99         peerChoking           bool
100         peerRequests          map[Request]*peerRequestState
101         PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
102         PeerListenPort        int
103         // The pieces the peer has claimed to have.
104         _peerPieces bitmap.Bitmap
105         // The peer has everything. This can occur due to a special message, when
106         // we may not even know the number of pieces in the torrent yet.
107         peerSentHaveAll bool
108         // The highest possible number of pieces the torrent could have based on
109         // communication with the peer. Generally only useful until we have the
110         // torrent info.
111         peerMinPieces pieceIndex
112         // Pieces we've accepted chunks for from the peer.
113         peerTouchedPieces map[pieceIndex]struct{}
114         peerAllowedFast   bitmap.Bitmap
115
116         PeerMaxRequests  int // Maximum pending requests the peer allows.
117         PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
118         PeerClientName   string
119
120         pieceInclination   []int
121         _pieceRequestOrder prioritybitmap.PriorityBitmap
122
123         logger log.Logger
124 }
125
126 // Maintains the state of a BitTorrent-protocol based connection with a peer.
127 type PeerConn struct {
128         Peer
129
130         // A string that should identify the PeerConn's net.Conn endpoints. The net.Conn could
131         // be wrapping WebRTC, uTP, or TCP etc. Used in writing the conn status for peers.
132         connString string
133
134         // See BEP 3 etc.
135         PeerID             PeerID
136         PeerExtensionBytes pp.PeerExtensionBits
137
138         // The actual Conn, used for closing, and setting socket options.
139         conn net.Conn
140         // The Reader and Writer for this Conn, with hooks installed for stats,
141         // limiting, deadlines etc.
142         w io.Writer
143         r io.Reader
144
145         writeBuffer *bytes.Buffer
146         uploadTimer *time.Timer
147         writerCond  sync.Cond
148
149         pex pexConnState
150 }
151
152 func (cn *PeerConn) connStatusString() string {
153         return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
154 }
155
156 func (cn *Peer) updateExpectingChunks() {
157         if cn.expectingChunks() {
158                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
159                         cn.lastStartedExpectingToReceiveChunks = time.Now()
160                 }
161         } else {
162                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
163                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
164                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
165                 }
166         }
167 }
168
169 func (cn *Peer) expectingChunks() bool {
170         if len(cn.requests) == 0 {
171                 return false
172         }
173         if !cn.interested {
174                 return false
175         }
176         for r := range cn.requests {
177                 if !cn.remoteChokingPiece(r.Index.Int()) {
178                         return true
179                 }
180         }
181         return false
182 }
183
184 func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
185         return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece))
186 }
187
188 // Returns true if the connection is over IPv6.
189 func (cn *PeerConn) ipv6() bool {
190         ip := cn.remoteIp()
191         if ip.To4() != nil {
192                 return false
193         }
194         return len(ip) == net.IPv6len
195 }
196
197 // Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the
198 // specification for this.
199 func (cn *PeerConn) isPreferredDirection() bool {
200         return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
201 }
202
203 // Returns whether the left connection should be preferred over the right one,
204 // considering only their networking properties. If ok is false, we can't
205 // decide.
206 func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
207         var ml multiLess
208         ml.NextBool(l.isPreferredDirection(), r.isPreferredDirection())
209         ml.NextBool(!l.utp(), !r.utp())
210         ml.NextBool(l.ipv6(), r.ipv6())
211         return ml.FinalOk()
212 }
213
214 func (cn *Peer) cumInterest() time.Duration {
215         ret := cn.priorInterest
216         if cn.interested {
217                 ret += time.Since(cn.lastBecameInterested)
218         }
219         return ret
220 }
221
222 func (cn *Peer) peerHasAllPieces() (all bool, known bool) {
223         if cn.peerSentHaveAll {
224                 return true, true
225         }
226         if !cn.t.haveInfo() {
227                 return false, false
228         }
229         return bitmap.Flip(cn._peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true
230 }
231
232 func (cn *PeerConn) locker() *lockWithDeferreds {
233         return cn.t.cl.locker()
234 }
235
236 func (cn *Peer) supportsExtension(ext pp.ExtensionName) bool {
237         _, ok := cn.PeerExtensionIDs[ext]
238         return ok
239 }
240
241 // The best guess at number of pieces in the torrent for this peer.
242 func (cn *Peer) bestPeerNumPieces() pieceIndex {
243         if cn.t.haveInfo() {
244                 return cn.t.numPieces()
245         }
246         return cn.peerMinPieces
247 }
248
249 func (cn *Peer) completedString() string {
250         have := pieceIndex(cn._peerPieces.Len())
251         if cn.peerSentHaveAll {
252                 have = cn.bestPeerNumPieces()
253         }
254         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
255 }
256
257 func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
258         cn.setNumPieces(info.NumPieces())
259 }
260
261 // Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by
262 // receiving badly sized BITFIELD, or invalid HAVE messages.
263 func (cn *PeerConn) setNumPieces(num pieceIndex) {
264         cn._peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
265         cn.peerPiecesChanged()
266 }
267
268 func eventAgeString(t time.Time) string {
269         if t.IsZero() {
270                 return "never"
271         }
272         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
273 }
274
275 func (cn *PeerConn) connectionFlags() (ret string) {
276         c := func(b byte) {
277                 ret += string([]byte{b})
278         }
279         if cn.cryptoMethod == mse.CryptoMethodRC4 {
280                 c('E')
281         } else if cn.headerEncrypted {
282                 c('e')
283         }
284         ret += string(cn.Discovery)
285         if cn.utp() {
286                 c('U')
287         }
288         return
289 }
290
291 func (cn *PeerConn) utp() bool {
292         return parseNetworkString(cn.Network).Udp
293 }
294
295 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
296 func (cn *Peer) statusFlags() (ret string) {
297         c := func(b byte) {
298                 ret += string([]byte{b})
299         }
300         if cn.interested {
301                 c('i')
302         }
303         if cn.choking {
304                 c('c')
305         }
306         c('-')
307         ret += cn.connectionFlags()
308         c('-')
309         if cn.peerInterested {
310                 c('i')
311         }
312         if cn.peerChoking {
313                 c('c')
314         }
315         return
316 }
317
318 func (cn *Peer) downloadRate() float64 {
319         num := cn._stats.BytesReadUsefulData.Int64()
320         if num == 0 {
321                 return 0
322         }
323         return float64(num) / cn.totalExpectingTime().Seconds()
324 }
325
326 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
327         ret = make(map[pieceIndex]int)
328         for r := range cn.requests {
329                 ret[pieceIndex(r.Index)]++
330         }
331         return
332 }
333
334 func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
335         // \t isn't preserved in <pre> blocks?
336         if cn.closed.IsSet() {
337                 fmt.Fprint(w, "CLOSED: ")
338         }
339         fmt.Fprintln(w, cn.connStatusString())
340         prio, err := cn.peerPriority()
341         prioStr := fmt.Sprintf("%08x", prio)
342         if err != nil {
343                 prioStr += ": " + err.Error()
344         }
345         fmt.Fprintf(w, "    bep40-prio: %v\n", prioStr)
346         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
347                 eventAgeString(cn.lastMessageReceived),
348                 eventAgeString(cn.completedHandshake),
349                 eventAgeString(cn.lastHelpful()),
350                 cn.cumInterest(),
351                 cn.totalExpectingTime(),
352         )
353         fmt.Fprintf(w,
354                 "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n",
355                 cn.completedString(),
356                 len(cn.peerTouchedPieces),
357                 &cn._stats.ChunksReadUseful,
358                 &cn._stats.ChunksRead,
359                 &cn._stats.ChunksWritten,
360                 cn.numLocalRequests(),
361                 cn.nominalMaxRequests(),
362                 cn.PeerMaxRequests,
363                 len(cn.peerRequests),
364                 localClientReqq,
365                 cn.statusFlags(),
366                 cn.downloadRate()/(1<<10),
367         )
368         fmt.Fprintf(w, "    requested pieces:")
369         for piece, count := range cn.numRequestsByPiece() {
370                 fmt.Fprintf(w, " %v (%v)", piece, count)
371         }
372         fmt.Fprintf(w, "\n")
373 }
374
375 func (p *Peer) close() {
376         if !p.closed.Set() {
377                 return
378         }
379         p.discardPieceInclination()
380         p._pieceRequestOrder.Clear()
381         p.peerImpl.onClose()
382         p.t.decPeerPieceAvailability(p)
383         for _, f := range p.callbacks.PeerClosed {
384                 f(p)
385         }
386 }
387
388 func (cn *PeerConn) onClose() {
389         if cn.pex.IsEnabled() {
390                 cn.pex.Close()
391         }
392         cn.tickleWriter()
393         if cn.conn != nil {
394                 cn.conn.Close()
395         }
396         if cb := cn.callbacks.PeerConnClosed; cb != nil {
397                 cb(cn)
398         }
399 }
400
401 func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
402         return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
403 }
404
405 // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
406 // https://github.com/pion/datachannel/issues/59 is fixed.
407 const writeBufferHighWaterLen = 1 << 15
408
409 // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is
410 // done asynchronously, so it may be that we're not able to honour backpressure from this method.
411 func (cn *PeerConn) write(msg pp.Message) bool {
412         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
413         // We don't need to track bytes here because a connection.w Writer wrapper takes care of that
414         // (although there's some delay between us recording the message, and the connection writer
415         // flushing it out.).
416         cn.writeBuffer.Write(msg.MustMarshalBinary())
417         // Last I checked only Piece messages affect stats, and we don't write those.
418         cn.wroteMsg(&msg)
419         cn.tickleWriter()
420         return !cn.writeBufferFull()
421 }
422
423 func (cn *PeerConn) writeBufferFull() bool {
424         return cn.writeBuffer.Len() >= writeBufferHighWaterLen
425 }
426
427 func (cn *PeerConn) requestMetadataPiece(index int) {
428         eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
429         if eID == 0 {
430                 return
431         }
432         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
433                 return
434         }
435         cn.logger.WithDefaultLevel(log.Debug).Printf("requesting metadata piece %d", index)
436         cn.write(pp.Message{
437                 Type:       pp.Extended,
438                 ExtendedID: eID,
439                 ExtendedPayload: func() []byte {
440                         b, err := bencode.Marshal(map[string]int{
441                                 "msg_type": pp.RequestMetadataExtensionMsgType,
442                                 "piece":    index,
443                         })
444                         if err != nil {
445                                 panic(err)
446                         }
447                         return b
448                 }(),
449         })
450         for index >= len(cn.metadataRequests) {
451                 cn.metadataRequests = append(cn.metadataRequests, false)
452         }
453         cn.metadataRequests[index] = true
454 }
455
456 func (cn *PeerConn) requestedMetadataPiece(index int) bool {
457         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
458 }
459
460 // The actual value to use as the maximum outbound requests.
461 func (cn *Peer) nominalMaxRequests() (ret int) {
462         return int(clamp(1, int64(cn.PeerMaxRequests), 64))
463 }
464
465 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
466         ret = cn.cumulativeExpectedToReceiveChunks
467         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
468                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
469         }
470         return
471
472 }
473
474 func (cn *PeerConn) onPeerSentCancel(r Request) {
475         if _, ok := cn.peerRequests[r]; !ok {
476                 torrent.Add("unexpected cancels received", 1)
477                 return
478         }
479         if cn.fastEnabled() {
480                 cn.reject(r)
481         } else {
482                 delete(cn.peerRequests, r)
483         }
484 }
485
486 func (cn *PeerConn) choke(msg messageWriter) (more bool) {
487         if cn.choking {
488                 return true
489         }
490         cn.choking = true
491         more = msg(pp.Message{
492                 Type: pp.Choke,
493         })
494         if cn.fastEnabled() {
495                 for r := range cn.peerRequests {
496                         // TODO: Don't reject pieces in allowed fast set.
497                         cn.reject(r)
498                 }
499         } else {
500                 cn.peerRequests = nil
501         }
502         return
503 }
504
505 func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
506         if !cn.choking {
507                 return true
508         }
509         cn.choking = false
510         return msg(pp.Message{
511                 Type: pp.Unchoke,
512         })
513 }
514
515 func (cn *Peer) setInterested(interested bool) bool {
516         if cn.interested == interested {
517                 return true
518         }
519         cn.interested = interested
520         if interested {
521                 cn.lastBecameInterested = time.Now()
522         } else if !cn.lastBecameInterested.IsZero() {
523                 cn.priorInterest += time.Since(cn.lastBecameInterested)
524         }
525         cn.updateExpectingChunks()
526         // log.Printf("%p: setting interest: %v", cn, interested)
527         return cn.writeInterested(interested)
528 }
529
530 func (pc *PeerConn) writeInterested(interested bool) bool {
531         return pc.write(pp.Message{
532                 Type: func() pp.MessageType {
533                         if interested {
534                                 return pp.Interested
535                         } else {
536                                 return pp.NotInterested
537                         }
538                 }(),
539         })
540 }
541
542 // The function takes a message to be sent, and returns true if more messages
543 // are okay.
544 type messageWriter func(pp.Message) bool
545
546 func (cn *Peer) shouldRequest(r Request) error {
547         if !cn.peerHasPiece(pieceIndex(r.Index)) {
548                 return errors.New("requesting piece peer doesn't have")
549         }
550         if !cn.t.peerIsActive(cn) {
551                 panic("requesting but not in active conns")
552         }
553         if cn.closed.IsSet() {
554                 panic("requesting when connection is closed")
555         }
556         if cn.t.hashingPiece(pieceIndex(r.Index)) {
557                 panic("piece is being hashed")
558         }
559         if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
560                 panic("piece is queued for hash")
561         }
562         return nil
563 }
564
565 func (cn *Peer) request(r Request) error {
566         if err := cn.shouldRequest(r); err != nil {
567                 panic(err)
568         }
569         if _, ok := cn.requests[r]; ok {
570                 return nil
571         }
572         if cn.numLocalRequests() >= cn.nominalMaxRequests() {
573                 return errors.New("too many outstanding requests")
574         }
575         if cn.requests == nil {
576                 cn.requests = make(map[Request]struct{})
577         }
578         cn.requests[r] = struct{}{}
579         if cn.validReceiveChunks == nil {
580                 cn.validReceiveChunks = make(map[Request]int)
581         }
582         cn.validReceiveChunks[r]++
583         cn.t.pendingRequests[r]++
584         cn.updateExpectingChunks()
585         for _, f := range cn.callbacks.SentRequest {
586                 f(PeerRequestEvent{cn, r})
587         }
588         cn.peerImpl._request(r)
589         return nil
590 }
591
592 func (me *PeerConn) _request(r Request) {
593         me.write(pp.Message{
594                 Type:   pp.Request,
595                 Index:  r.Index,
596                 Begin:  r.Begin,
597                 Length: r.Length,
598         })
599 }
600
601 func (me *Peer) cancel(r Request) {
602         if me.deleteRequest(r) {
603                 me.peerImpl._cancel(r)
604         }
605 }
606
607 func (me *PeerConn) _cancel(r Request) {
608         me.write(makeCancelMessage(r))
609 }
610
611 func (cn *PeerConn) fillWriteBuffer() {
612         if cn.pex.IsEnabled() {
613                 if flow := cn.pex.Share(cn.write); !flow {
614                         return
615                 }
616         }
617         cn.upload(cn.write)
618 }
619
620 // Routine that writes to the peer. Some of what to write is buffered by
621 // activity elsewhere in the Client, and some is determined locally when the
622 // connection is writable.
623 func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
624         var (
625                 lastWrite      time.Time = time.Now()
626                 keepAliveTimer *time.Timer
627         )
628         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
629                 cn.locker().Lock()
630                 defer cn.locker().Unlock()
631                 if time.Since(lastWrite) >= keepAliveTimeout {
632                         cn.tickleWriter()
633                 }
634                 keepAliveTimer.Reset(keepAliveTimeout)
635         })
636         cn.locker().Lock()
637         defer cn.locker().Unlock()
638         defer cn.close()
639         defer keepAliveTimer.Stop()
640         frontBuf := new(bytes.Buffer)
641         for {
642                 if cn.closed.IsSet() {
643                         return
644                 }
645                 if cn.writeBuffer.Len() == 0 {
646                         cn.fillWriteBuffer()
647                 }
648                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.useful() {
649                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
650                         torrent.Add("written keepalives", 1)
651                 }
652                 if cn.writeBuffer.Len() == 0 {
653                         // TODO: Minimize wakeups....
654                         cn.writerCond.Wait()
655                         continue
656                 }
657                 // Flip the buffers.
658                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
659                 cn.locker().Unlock()
660                 n, err := cn.w.Write(frontBuf.Bytes())
661                 cn.locker().Lock()
662                 if n != 0 {
663                         lastWrite = time.Now()
664                         keepAliveTimer.Reset(keepAliveTimeout)
665                 }
666                 if err != nil {
667                         cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
668                         return
669                 }
670                 if n != frontBuf.Len() {
671                         panic("short write")
672                 }
673                 frontBuf.Reset()
674         }
675 }
676
677 func (cn *PeerConn) have(piece pieceIndex) {
678         if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
679                 return
680         }
681         cn.write(pp.Message{
682                 Type:  pp.Have,
683                 Index: pp.Integer(piece),
684         })
685         cn.sentHaves.Add(bitmap.BitIndex(piece))
686 }
687
688 func (cn *PeerConn) postBitfield() {
689         if cn.sentHaves.Len() != 0 {
690                 panic("bitfield must be first have-related message sent")
691         }
692         if !cn.t.haveAnyPieces() {
693                 return
694         }
695         cn.write(pp.Message{
696                 Type:     pp.Bitfield,
697                 Bitfield: cn.t.bitfield(),
698         })
699         cn.sentHaves = cn.t._completedPieces.Copy()
700 }
701
702 func (cn *PeerConn) updateRequests() {
703         // log.Print("update requests")
704         cn.tickleWriter()
705 }
706
707 // Emits the indices in the Bitmaps bms in order, never repeating any index.
708 // skip is mutated during execution, and its initial values will never be
709 // emitted.
710 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
711         return func(cb iter.Callback) {
712                 for _, bm := range bms {
713                         if !iter.All(
714                                 func(_i interface{}) bool {
715                                         i := _i.(int)
716                                         if skip.Contains(i) {
717                                                 return true
718                                         }
719                                         skip.Add(i)
720                                         return cb(i)
721                                 },
722                                 bm.Iter,
723                         ) {
724                                 return
725                         }
726                 }
727         }
728 }
729
730 // check callers updaterequests
731 func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool {
732         return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
733 }
734
735 // This is distinct from Torrent piece priority, which is the user's
736 // preference. Connection piece priority is specific to a connection and is
737 // used to pseudorandomly avoid connections always requesting the same pieces
738 // and thus wasting effort.
739 func (cn *Peer) updatePiecePriority(piece pieceIndex) bool {
740         tpp := cn.t.piecePriority(piece)
741         if !cn.peerHasPiece(piece) {
742                 tpp = PiecePriorityNone
743         }
744         if tpp == PiecePriorityNone {
745                 return cn.stopRequestingPiece(piece)
746         }
747         prio := cn.getPieceInclination()[piece]
748         return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio)
749 }
750
751 func (cn *Peer) getPieceInclination() []int {
752         if cn.pieceInclination == nil {
753                 cn.pieceInclination = cn.t.getConnPieceInclination()
754         }
755         return cn.pieceInclination
756 }
757
758 func (cn *Peer) discardPieceInclination() {
759         if cn.pieceInclination == nil {
760                 return
761         }
762         cn.t.putPieceInclination(cn.pieceInclination)
763         cn.pieceInclination = nil
764 }
765
766 func (cn *Peer) peerPiecesChanged() {
767         if cn.t.haveInfo() {
768                 prioritiesChanged := false
769                 for i := pieceIndex(0); i < cn.t.numPieces(); i++ {
770                         if cn.updatePiecePriority(i) {
771                                 prioritiesChanged = true
772                         }
773                 }
774                 if prioritiesChanged {
775                         cn.updateRequests()
776                 }
777         }
778         cn.t.maybeDropMutuallyCompletePeer(cn)
779 }
780
781 func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) {
782         if newMin > cn.peerMinPieces {
783                 cn.peerMinPieces = newMin
784         }
785 }
786
787 func (cn *PeerConn) peerSentHave(piece pieceIndex) error {
788         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
789                 return errors.New("invalid piece")
790         }
791         if cn.peerHasPiece(piece) {
792                 return nil
793         }
794         cn.raisePeerMinPieces(piece + 1)
795         if !cn.peerHasPiece(piece) {
796                 cn.t.incPieceAvailability(piece)
797         }
798         cn._peerPieces.Set(bitmap.BitIndex(piece), true)
799         cn.t.maybeDropMutuallyCompletePeer(&cn.Peer)
800         if cn.updatePiecePriority(piece) {
801                 cn.updateRequests()
802         }
803         return nil
804 }
805
806 func (cn *PeerConn) peerSentBitfield(bf []bool) error {
807         if len(bf)%8 != 0 {
808                 panic("expected bitfield length divisible by 8")
809         }
810         // We know that the last byte means that at most the last 7 bits are wasted.
811         cn.raisePeerMinPieces(pieceIndex(len(bf) - 7))
812         if cn.t.haveInfo() && len(bf) > int(cn.t.numPieces()) {
813                 // Ignore known excess pieces.
814                 bf = bf[:cn.t.numPieces()]
815         }
816         pp := cn.newPeerPieces()
817         cn.peerSentHaveAll = false
818         for i, have := range bf {
819                 if have {
820                         cn.raisePeerMinPieces(pieceIndex(i) + 1)
821                         if !pp.Contains(i) {
822                                 cn.t.incPieceAvailability(i)
823                         }
824                 } else {
825                         if pp.Contains(i) {
826                                 cn.t.decPieceAvailability(i)
827                         }
828                 }
829                 cn._peerPieces.Set(i, have)
830         }
831         cn.peerPiecesChanged()
832         return nil
833 }
834
835 func (cn *Peer) onPeerHasAllPieces() {
836         t := cn.t
837         if t.haveInfo() {
838                 pp := cn.newPeerPieces()
839                 for i := range iter.N(t.numPieces()) {
840                         if !pp.Contains(i) {
841                                 t.incPieceAvailability(i)
842                         }
843                 }
844         }
845         cn.peerSentHaveAll = true
846         cn._peerPieces.Clear()
847         cn.peerPiecesChanged()
848 }
849
850 func (cn *PeerConn) onPeerSentHaveAll() error {
851         cn.onPeerHasAllPieces()
852         return nil
853 }
854
855 func (cn *PeerConn) peerSentHaveNone() error {
856         cn.t.decPeerPieceAvailability(&cn.Peer)
857         cn._peerPieces.Clear()
858         cn.peerSentHaveAll = false
859         cn.peerPiecesChanged()
860         return nil
861 }
862
863 func (c *PeerConn) requestPendingMetadata() {
864         if c.t.haveInfo() {
865                 return
866         }
867         if c.PeerExtensionIDs[pp.ExtensionNameMetadata] == 0 {
868                 // Peer doesn't support this.
869                 return
870         }
871         // Request metadata pieces that we don't have in a random order.
872         var pending []int
873         for index := 0; index < c.t.metadataPieceCount(); index++ {
874                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
875                         pending = append(pending, index)
876                 }
877         }
878         rand.Shuffle(len(pending), func(i, j int) { pending[i], pending[j] = pending[j], pending[i] })
879         for _, i := range pending {
880                 c.requestMetadataPiece(i)
881         }
882 }
883
884 func (cn *PeerConn) wroteMsg(msg *pp.Message) {
885         torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
886         if msg.Type == pp.Extended {
887                 for name, id := range cn.PeerExtensionIDs {
888                         if id != msg.ExtendedID {
889                                 continue
890                         }
891                         torrent.Add(fmt.Sprintf("Extended messages written for protocol %q", name), 1)
892                 }
893         }
894         cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
895 }
896
897 func (cn *PeerConn) readMsg(msg *pp.Message) {
898         cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
899 }
900
901 // After handshake, we know what Torrent and Client stats to include for a
902 // connection.
903 func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
904         t := cn.t
905         f(&t.stats)
906         f(&t.cl.stats)
907 }
908
909 // All ConnStats that include this connection. Some objects are not known
910 // until the handshake is complete, after which it's expected to reconcile the
911 // differences.
912 func (cn *Peer) allStats(f func(*ConnStats)) {
913         f(&cn._stats)
914         if cn.reconciledHandshakeStats {
915                 cn.postHandshakeStats(f)
916         }
917 }
918
919 func (cn *PeerConn) wroteBytes(n int64) {
920         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
921 }
922
923 func (cn *PeerConn) readBytes(n int64) {
924         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
925 }
926
927 // Returns whether the connection could be useful to us. We're seeding and
928 // they want data, we don't have metainfo and they can provide it, etc.
929 func (c *Peer) useful() bool {
930         t := c.t
931         if c.closed.IsSet() {
932                 return false
933         }
934         if !t.haveInfo() {
935                 return c.supportsExtension("ut_metadata")
936         }
937         if t.seeding() && c.peerInterested {
938                 return true
939         }
940         if c.peerHasWantedPieces() {
941                 return true
942         }
943         return false
944 }
945
946 func (c *Peer) lastHelpful() (ret time.Time) {
947         ret = c.lastUsefulChunkReceived
948         if c.t.seeding() && c.lastChunkSent.After(ret) {
949                 ret = c.lastChunkSent
950         }
951         return
952 }
953
954 func (c *PeerConn) fastEnabled() bool {
955         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast()
956 }
957
958 func (c *PeerConn) reject(r Request) {
959         if !c.fastEnabled() {
960                 panic("fast not enabled")
961         }
962         c.write(r.ToMsg(pp.Reject))
963         delete(c.peerRequests, r)
964 }
965
966 func (c *PeerConn) onReadRequest(r Request) error {
967         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
968         if _, ok := c.peerRequests[r]; ok {
969                 torrent.Add("duplicate requests received", 1)
970                 return nil
971         }
972         if c.choking {
973                 torrent.Add("requests received while choking", 1)
974                 if c.fastEnabled() {
975                         torrent.Add("requests rejected while choking", 1)
976                         c.reject(r)
977                 }
978                 return nil
979         }
980         if len(c.peerRequests) >= maxRequests {
981                 torrent.Add("requests received while queue full", 1)
982                 if c.fastEnabled() {
983                         c.reject(r)
984                 }
985                 // BEP 6 says we may close here if we choose.
986                 return nil
987         }
988         if !c.t.havePiece(pieceIndex(r.Index)) {
989                 // This isn't necessarily them screwing up. We can drop pieces
990                 // from our storage, and can't communicate this to peers
991                 // except by reconnecting.
992                 requestsReceivedForMissingPieces.Add(1)
993                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
994         }
995         // Check this after we know we have the piece, so that the piece length will be known.
996         if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
997                 torrent.Add("bad requests received", 1)
998                 return errors.New("bad Request")
999         }
1000         if c.peerRequests == nil {
1001                 c.peerRequests = make(map[Request]*peerRequestState, maxRequests)
1002         }
1003         value := &peerRequestState{}
1004         c.peerRequests[r] = value
1005         go c.peerRequestDataReader(r, value)
1006         //c.tickleWriter()
1007         return nil
1008 }
1009
1010 func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
1011         b, err := readPeerRequestData(r, c)
1012         c.locker().Lock()
1013         defer c.locker().Unlock()
1014         if err != nil {
1015                 c.peerRequestDataReadFailed(err, r)
1016         } else {
1017                 if b == nil {
1018                         panic("data must be non-nil to trigger send")
1019                 }
1020                 prs.data = b
1021                 c.tickleWriter()
1022         }
1023 }
1024
1025 // If this is maintained correctly, we might be able to support optional synchronous reading for
1026 // chunk sending, the way it used to work.
1027 func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
1028         c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err)
1029         i := pieceIndex(r.Index)
1030         if c.t.pieceComplete(i) {
1031                 // There used to be more code here that just duplicated the following break. Piece
1032                 // completions are currently cached, so I'm not sure how helpful this update is, except to
1033                 // pull any completion changes pushed to the storage backend in failed reads that got us
1034                 // here.
1035                 c.t.updatePieceCompletion(i)
1036         }
1037         // If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've
1038         // probably dropped a piece from storage, but there's no way to communicate this to the peer. If
1039         // they ask for it again, we'll kick them to allow us to send them an updated bitfield on the
1040         // next connect. TODO: Support rejecting here too.
1041         if c.choking {
1042                 c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
1043         }
1044         c.choke(c.write)
1045 }
1046
1047 func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
1048         b := make([]byte, r.Length)
1049         p := c.t.info.Piece(int(r.Index))
1050         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1051         if n == len(b) {
1052                 if err == io.EOF {
1053                         err = nil
1054                 }
1055         } else {
1056                 if err == nil {
1057                         panic("expected error")
1058                 }
1059         }
1060         return b, err
1061 }
1062
1063 func runSafeExtraneous(f func()) {
1064         if true {
1065                 go f()
1066         } else {
1067                 f()
1068         }
1069 }
1070
1071 // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and
1072 // exit. Returning will end the connection.
1073 func (c *PeerConn) mainReadLoop() (err error) {
1074         defer func() {
1075                 if err != nil {
1076                         torrent.Add("connection.mainReadLoop returned with error", 1)
1077                 } else {
1078                         torrent.Add("connection.mainReadLoop returned with no error", 1)
1079                 }
1080         }()
1081         t := c.t
1082         cl := t.cl
1083
1084         decoder := pp.Decoder{
1085                 R:         bufio.NewReaderSize(c.r, 1<<17),
1086                 MaxLength: 256 * 1024,
1087                 Pool:      t.chunkPool,
1088         }
1089         for {
1090                 var msg pp.Message
1091                 func() {
1092                         cl.unlock()
1093                         defer cl.lock()
1094                         err = decoder.Decode(&msg)
1095                 }()
1096                 if cb := c.callbacks.ReadMessage; cb != nil && err == nil {
1097                         cb(c, &msg)
1098                 }
1099                 if t.closed.IsSet() || c.closed.IsSet() {
1100                         return nil
1101                 }
1102                 if err != nil {
1103                         return err
1104                 }
1105                 c.readMsg(&msg)
1106                 c.lastMessageReceived = time.Now()
1107                 if msg.Keepalive {
1108                         receivedKeepalives.Add(1)
1109                         continue
1110                 }
1111                 messageTypesReceived.Add(msg.Type.String(), 1)
1112                 if msg.Type.FastExtension() && !c.fastEnabled() {
1113                         runSafeExtraneous(func() { torrent.Add("fast messages received when extension is disabled", 1) })
1114                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
1115                 }
1116                 switch msg.Type {
1117                 case pp.Choke:
1118                         c.peerChoking = true
1119                         if !c.fastEnabled() {
1120                                 c.deleteAllRequests()
1121                         }
1122                         // We can then reset our interest.
1123                         c.updateRequests()
1124                         c.updateExpectingChunks()
1125                 case pp.Unchoke:
1126                         c.peerChoking = false
1127                         c.tickleWriter()
1128                         c.updateExpectingChunks()
1129                 case pp.Interested:
1130                         c.peerInterested = true
1131                         c.tickleWriter()
1132                 case pp.NotInterested:
1133                         c.peerInterested = false
1134                         // We don't clear their requests since it isn't clear in the spec.
1135                         // We'll probably choke them for this, which will clear them if
1136                         // appropriate, and is clearly specified.
1137                 case pp.Have:
1138                         err = c.peerSentHave(pieceIndex(msg.Index))
1139                 case pp.Bitfield:
1140                         err = c.peerSentBitfield(msg.Bitfield)
1141                 case pp.Request:
1142                         r := newRequestFromMessage(&msg)
1143                         err = c.onReadRequest(r)
1144                 case pp.Piece:
1145                         err = c.receiveChunk(&msg)
1146                         if len(msg.Piece) == int(t.chunkSize) {
1147                                 t.chunkPool.Put(&msg.Piece)
1148                         }
1149                         if err != nil {
1150                                 err = fmt.Errorf("receiving chunk: %s", err)
1151                         }
1152                 case pp.Cancel:
1153                         req := newRequestFromMessage(&msg)
1154                         c.onPeerSentCancel(req)
1155                 case pp.Port:
1156                         ipa, ok := tryIpPortFromNetAddr(c.RemoteAddr)
1157                         if !ok {
1158                                 break
1159                         }
1160                         pingAddr := net.UDPAddr{
1161                                 IP:   ipa.IP,
1162                                 Port: ipa.Port,
1163                         }
1164                         if msg.Port != 0 {
1165                                 pingAddr.Port = int(msg.Port)
1166                         }
1167                         cl.eachDhtServer(func(s DhtServer) {
1168                                 go s.Ping(&pingAddr)
1169                         })
1170                 case pp.Suggest:
1171                         torrent.Add("suggests received", 1)
1172                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger)
1173                         c.updateRequests()
1174                 case pp.HaveAll:
1175                         err = c.onPeerSentHaveAll()
1176                 case pp.HaveNone:
1177                         err = c.peerSentHaveNone()
1178                 case pp.Reject:
1179                         c.remoteRejectedRequest(newRequestFromMessage(&msg))
1180                 case pp.AllowedFast:
1181                         torrent.Add("allowed fasts received", 1)
1182                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
1183                         c.peerAllowedFast.Add(int(msg.Index))
1184                         c.updateRequests()
1185                 case pp.Extended:
1186                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1187                 default:
1188                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1189                 }
1190                 if err != nil {
1191                         return err
1192                 }
1193         }
1194 }
1195
1196 func (c *Peer) remoteRejectedRequest(r Request) {
1197         if c.deleteRequest(r) {
1198                 c.decExpectedChunkReceive(r)
1199         }
1200 }
1201
1202 func (c *Peer) decExpectedChunkReceive(r Request) {
1203         count := c.validReceiveChunks[r]
1204         if count == 1 {
1205                 delete(c.validReceiveChunks, r)
1206         } else if count > 1 {
1207                 c.validReceiveChunks[r] = count - 1
1208         } else {
1209                 panic(r)
1210         }
1211 }
1212
1213 func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) {
1214         defer func() {
1215                 // TODO: Should we still do this?
1216                 if err != nil {
1217                         // These clients use their own extension IDs for outgoing message
1218                         // types, which is incorrect.
1219                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1220                                 err = nil
1221                         }
1222                 }
1223         }()
1224         t := c.t
1225         cl := t.cl
1226         switch id {
1227         case pp.HandshakeExtendedID:
1228                 var d pp.ExtendedHandshakeMessage
1229                 if err := bencode.Unmarshal(payload, &d); err != nil {
1230                         c.logger.Printf("error parsing extended handshake message %q: %s", payload, err)
1231                         return fmt.Errorf("unmarshalling extended handshake payload: %w", err)
1232                 }
1233                 if cb := c.callbacks.ReadExtendedHandshake; cb != nil {
1234                         cb(c, &d)
1235                 }
1236                 //c.logger.WithDefaultLevel(log.Debug).Printf("received extended handshake message:\n%s", spew.Sdump(d))
1237                 if d.Reqq != 0 {
1238                         c.PeerMaxRequests = d.Reqq
1239                 }
1240                 c.PeerClientName = d.V
1241                 if c.PeerExtensionIDs == nil {
1242                         c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber, len(d.M))
1243                 }
1244                 c.PeerListenPort = d.Port
1245                 c.PeerPrefersEncryption = d.Encryption
1246                 for name, id := range d.M {
1247                         if _, ok := c.PeerExtensionIDs[name]; !ok {
1248                                 peersSupportingExtension.Add(string(name), 1)
1249                         }
1250                         c.PeerExtensionIDs[name] = id
1251                 }
1252                 if d.MetadataSize != 0 {
1253                         if err = t.setMetadataSize(d.MetadataSize); err != nil {
1254                                 return fmt.Errorf("setting metadata size to %d: %w", d.MetadataSize, err)
1255                         }
1256                 }
1257                 c.requestPendingMetadata()
1258                 if !t.cl.config.DisablePEX {
1259                         t.pex.Add(c) // we learnt enough now
1260                         c.pex.Init(c)
1261                 }
1262                 return nil
1263         case metadataExtendedId:
1264                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1265                 if err != nil {
1266                         return fmt.Errorf("handling metadata extension message: %w", err)
1267                 }
1268                 return nil
1269         case pexExtendedId:
1270                 if !c.pex.IsEnabled() {
1271                         return nil // or hang-up maybe?
1272                 }
1273                 return c.pex.Recv(payload)
1274         default:
1275                 return fmt.Errorf("unexpected extended message ID: %v", id)
1276         }
1277 }
1278
1279 // Set both the Reader and Writer for the connection from a single ReadWriter.
1280 func (cn *PeerConn) setRW(rw io.ReadWriter) {
1281         cn.r = rw
1282         cn.w = rw
1283 }
1284
1285 // Returns the Reader and Writer as a combined ReadWriter.
1286 func (cn *PeerConn) rw() io.ReadWriter {
1287         return struct {
1288                 io.Reader
1289                 io.Writer
1290         }{cn.r, cn.w}
1291 }
1292
1293 // Handle a received chunk from a peer.
1294 func (c *Peer) receiveChunk(msg *pp.Message) error {
1295         t := c.t
1296         cl := t.cl
1297         chunksReceived.Add("total", 1)
1298
1299         req := newRequestFromMessage(msg)
1300
1301         if c.peerChoking {
1302                 chunksReceived.Add("while choked", 1)
1303         }
1304
1305         if c.validReceiveChunks[req] <= 0 {
1306                 chunksReceived.Add("unexpected", 1)
1307                 return errors.New("received unexpected chunk")
1308         }
1309         c.decExpectedChunkReceive(req)
1310
1311         if c.peerChoking && c.peerAllowedFast.Get(int(req.Index)) {
1312                 chunksReceived.Add("due to allowed fast", 1)
1313         }
1314
1315         // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
1316         // have actually already received the piece, while we have the Client unlocked to write the data
1317         // out.
1318         deletedRequest := false
1319         {
1320                 if _, ok := c.requests[req]; ok {
1321                         for _, f := range c.callbacks.ReceivedRequested {
1322                                 f(PeerMessageEvent{c, msg})
1323                         }
1324                 }
1325                 // Request has been satisfied.
1326                 if c.deleteRequest(req) {
1327                         deletedRequest = true
1328                         if !c.peerChoking {
1329                                 c._chunksReceivedWhileExpecting++
1330                         }
1331                 } else {
1332                         chunksReceived.Add("unwanted", 1)
1333                 }
1334         }
1335
1336         // Do we actually want this chunk?
1337         if t.haveChunk(req) {
1338                 chunksReceived.Add("wasted", 1)
1339                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
1340                 return nil
1341         }
1342
1343         piece := &t.pieces[req.Index]
1344
1345         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
1346         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
1347         if deletedRequest {
1348                 c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
1349         }
1350         for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
1351                 f(ReceivedUsefulDataEvent{c, msg})
1352         }
1353         c.lastUsefulChunkReceived = time.Now()
1354
1355         // Need to record that it hasn't been written yet, before we attempt to do
1356         // anything with it.
1357         piece.incrementPendingWrites()
1358         // Record that we have the chunk, so we aren't trying to download it while
1359         // waiting for it to be written to storage.
1360         piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
1361
1362         // Cancel pending requests for this chunk from *other* peers.
1363         t.iterPeers(func(p *Peer) {
1364                 if p == c {
1365                         return
1366                 }
1367                 p.cancel(req)
1368         })
1369
1370         err := func() error {
1371                 cl.unlock()
1372                 defer cl.lock()
1373                 concurrentChunkWrites.Add(1)
1374                 defer concurrentChunkWrites.Add(-1)
1375                 // Write the chunk out. Note that the upper bound on chunk writing concurrency will be the
1376                 // number of connections. We write inline with receiving the chunk (with this lock dance),
1377                 // because we want to handle errors synchronously and I haven't thought of a nice way to
1378                 // defer any concurrency to the storage and have that notify the client of errors. TODO: Do
1379                 // that instead.
1380                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1381         }()
1382
1383         piece.decrementPendingWrites()
1384
1385         if err != nil {
1386                 c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
1387                 t.pendRequest(req)
1388                 //t.updatePieceCompletion(pieceIndex(msg.Index))
1389                 t.onWriteChunkErr(err)
1390                 return nil
1391         }
1392
1393         c.onDirtiedPiece(pieceIndex(req.Index))
1394
1395         // We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
1396         if t.pieceAllDirty(pieceIndex(req.Index)) && piece.pendingWrites == 0 {
1397                 t.queuePieceCheck(pieceIndex(req.Index))
1398                 // We don't pend all chunks here anymore because we don't want code dependent on the dirty
1399                 // chunk status (such as the haveChunk call above) to have to check all the various other
1400                 // piece states like queued for hash, hashing etc. This does mean that we need to be sure
1401                 // that chunk pieces are pended at an appropriate time later however.
1402         }
1403
1404         cl.event.Broadcast()
1405         // We do this because we've written a chunk, and may change PieceState.Partial.
1406         t.publishPieceChange(pieceIndex(req.Index))
1407
1408         return nil
1409 }
1410
1411 func (c *Peer) onDirtiedPiece(piece pieceIndex) {
1412         if c.peerTouchedPieces == nil {
1413                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
1414         }
1415         c.peerTouchedPieces[piece] = struct{}{}
1416         ds := &c.t.pieces[piece].dirtiers
1417         if *ds == nil {
1418                 *ds = make(map[*Peer]struct{})
1419         }
1420         (*ds)[c] = struct{}{}
1421 }
1422
1423 func (c *PeerConn) uploadAllowed() bool {
1424         if c.t.cl.config.NoUpload {
1425                 return false
1426         }
1427         if c.t.dataUploadDisallowed {
1428                 return false
1429         }
1430         if c.t.seeding() {
1431                 return true
1432         }
1433         if !c.peerHasWantedPieces() {
1434                 return false
1435         }
1436         // Don't upload more than 100 KiB more than we download.
1437         if c._stats.BytesWrittenData.Int64() >= c._stats.BytesReadData.Int64()+100<<10 {
1438                 return false
1439         }
1440         return true
1441 }
1442
1443 func (c *PeerConn) setRetryUploadTimer(delay time.Duration) {
1444         if c.uploadTimer == nil {
1445                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1446         } else {
1447                 c.uploadTimer.Reset(delay)
1448         }
1449 }
1450
1451 // Also handles choking and unchoking of the remote peer.
1452 func (c *PeerConn) upload(msg func(pp.Message) bool) bool {
1453         // Breaking or completing this loop means we don't want to upload to the
1454         // peer anymore, and we choke them.
1455 another:
1456         for c.uploadAllowed() {
1457                 // We want to upload to the peer.
1458                 if !c.unchoke(msg) {
1459                         return false
1460                 }
1461                 for r, state := range c.peerRequests {
1462                         if state.data == nil {
1463                                 continue
1464                         }
1465                         res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
1466                         if !res.OK() {
1467                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1468                         }
1469                         delay := res.Delay()
1470                         if delay > 0 {
1471                                 res.Cancel()
1472                                 c.setRetryUploadTimer(delay)
1473                                 // Hard to say what to return here.
1474                                 return true
1475                         }
1476                         more := c.sendChunk(r, msg, state)
1477                         delete(c.peerRequests, r)
1478                         if !more {
1479                                 return false
1480                         }
1481                         goto another
1482                 }
1483                 return true
1484         }
1485         return c.choke(msg)
1486 }
1487
1488 func (cn *PeerConn) drop() {
1489         cn.t.dropConnection(cn)
1490 }
1491
1492 func (cn *Peer) netGoodPiecesDirtied() int64 {
1493         return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
1494 }
1495
1496 func (c *Peer) peerHasWantedPieces() bool {
1497         return !c._pieceRequestOrder.IsEmpty()
1498 }
1499
1500 func (c *Peer) numLocalRequests() int {
1501         return len(c.requests)
1502 }
1503
1504 func (c *Peer) deleteRequest(r Request) bool {
1505         if _, ok := c.requests[r]; !ok {
1506                 return false
1507         }
1508         delete(c.requests, r)
1509         for _, f := range c.callbacks.DeletedRequest {
1510                 f(PeerRequestEvent{c, r})
1511         }
1512         c.updateExpectingChunks()
1513         pr := c.t.pendingRequests
1514         pr[r]--
1515         n := pr[r]
1516         if n == 0 {
1517                 delete(pr, r)
1518         }
1519         if n < 0 {
1520                 panic(n)
1521         }
1522         return true
1523 }
1524
1525 func (c *Peer) deleteAllRequests() {
1526         for r := range c.requests {
1527                 c.deleteRequest(r)
1528         }
1529         if len(c.requests) != 0 {
1530                 panic(len(c.requests))
1531         }
1532         // for c := range c.t.conns {
1533         //      c.tickleWriter()
1534         // }
1535 }
1536
1537 // This is called when something has changed that should wake the writer, such as putting stuff into
1538 // the writeBuffer, or changing some state that the writer can act on.
1539 func (c *PeerConn) tickleWriter() {
1540         c.writerCond.Broadcast()
1541 }
1542
1543 func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
1544         c.lastChunkSent = time.Now()
1545         return msg(pp.Message{
1546                 Type:  pp.Piece,
1547                 Index: r.Index,
1548                 Begin: r.Begin,
1549                 Piece: state.data,
1550         })
1551 }
1552
1553 func (c *PeerConn) setTorrent(t *Torrent) {
1554         if c.t != nil {
1555                 panic("connection already associated with a torrent")
1556         }
1557         c.t = t
1558         c.logger.WithDefaultLevel(log.Debug).Printf("set torrent=%v", t)
1559         t.reconcileHandshakeStats(c)
1560 }
1561
1562 func (c *Peer) peerPriority() (peerPriority, error) {
1563         return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
1564 }
1565
1566 func (c *Peer) remoteIp() net.IP {
1567         host, _, _ := net.SplitHostPort(c.RemoteAddr.String())
1568         return net.ParseIP(host)
1569 }
1570
1571 func (c *Peer) remoteIpPort() IpPort {
1572         ipa, _ := tryIpPortFromNetAddr(c.RemoteAddr)
1573         return IpPort{ipa.IP, uint16(ipa.Port)}
1574 }
1575
1576 func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags {
1577         f := pp.PexPeerFlags(0)
1578         if c.PeerPrefersEncryption {
1579                 f |= pp.PexPrefersEncryption
1580         }
1581         if c.outgoing {
1582                 f |= pp.PexOutgoingConn
1583         }
1584         if c.utp() {
1585                 f |= pp.PexSupportsUtp
1586         }
1587         return f
1588 }
1589
1590 // This returns the address to use if we want to dial the peer again. It incorporates the peer's
1591 // advertised listen port.
1592 func (c *PeerConn) dialAddr() PeerRemoteAddr {
1593         if !c.outgoing && c.PeerListenPort != 0 {
1594                 switch addr := c.RemoteAddr.(type) {
1595                 case *net.TCPAddr:
1596                         dialAddr := *addr
1597                         dialAddr.Port = c.PeerListenPort
1598                         return &dialAddr
1599                 case *net.UDPAddr:
1600                         dialAddr := *addr
1601                         dialAddr.Port = c.PeerListenPort
1602                         return &dialAddr
1603                 }
1604         }
1605         return c.RemoteAddr
1606 }
1607
1608 func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
1609         f := c.pexPeerFlags()
1610         addr := c.dialAddr()
1611         return pexEvent{t, addr, f}
1612 }
1613
1614 func (c *PeerConn) String() string {
1615         return fmt.Sprintf("connection %p", c)
1616 }
1617
1618 func (c *Peer) trust() connectionTrust {
1619         return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
1620 }
1621
1622 type connectionTrust struct {
1623         Implicit            bool
1624         NetGoodPiecesDirted int64
1625 }
1626
1627 func (l connectionTrust) Less(r connectionTrust) bool {
1628         return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
1629 }
1630
1631 func (cn *Peer) peerMaxRequests() int {
1632         return cn.PeerMaxRequests
1633 }
1634
1635 // Returns the pieces the peer has claimed to have.
1636 func (cn *PeerConn) PeerPieces() bitmap.Bitmap {
1637         cn.locker().RLock()
1638         defer cn.locker().RUnlock()
1639         return cn.newPeerPieces()
1640 }
1641
1642 // Returns a new Bitmap that includes bits for all pieces we have.
1643 func (cn *Peer) newPeerPieces() bitmap.Bitmap {
1644         ret := cn._peerPieces.Copy()
1645         if cn.peerSentHaveAll {
1646                 ret.AddRange(0, cn.t.numPieces())
1647         }
1648         return ret
1649 }
1650
1651 func (cn *Peer) pieceRequestOrder() *prioritybitmap.PriorityBitmap {
1652         return &cn._pieceRequestOrder
1653 }
1654
1655 func (cn *Peer) stats() *ConnStats {
1656         return &cn._stats
1657 }
1658
1659 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
1660         pc, ok := p.peerImpl.(*PeerConn)
1661         return pc, ok
1662 }