]> Sergey Matveev's repositories - btrtrc.git/blob - peer.go
cmd/btrtrc client
[btrtrc.git] / peer.go
1 package torrent
2
3 import (
4         "errors"
5         "fmt"
6         "io"
7         "net"
8         "strings"
9         "sync"
10         "time"
11
12         "github.com/RoaringBitmap/roaring"
13         "github.com/anacrolix/chansync"
14         . "github.com/anacrolix/generics"
15         "github.com/anacrolix/log"
16         "github.com/anacrolix/missinggo/iter"
17         "github.com/anacrolix/missinggo/v2/bitmap"
18         "github.com/anacrolix/multiless"
19
20         "github.com/anacrolix/torrent/internal/alloclim"
21         "github.com/anacrolix/torrent/mse"
22         pp "github.com/anacrolix/torrent/peer_protocol"
23         request_strategy "github.com/anacrolix/torrent/request-strategy"
24         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
25 )
26
27 type (
28         Peer struct {
29                 // First to ensure 64-bit alignment for atomics. See #262.
30                 _stats ConnStats
31
32                 t *Torrent
33
34                 peerImpl
35                 callbacks *Callbacks
36
37                 outgoing   bool
38                 Network    string
39                 RemoteAddr PeerRemoteAddr
40                 // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
41                 // config.
42                 localPublicAddr peerLocalPublicAddr
43                 bannableAddr    Option[bannableAddr]
44                 // True if the connection is operating over MSE obfuscation.
45                 headerEncrypted bool
46                 cryptoMethod    mse.CryptoMethod
47                 Discovery       PeerSource
48                 trusted         bool
49                 closed          chansync.SetOnce
50                 // Set true after we've added our ConnStats generated during handshake to
51                 // other ConnStat instances as determined when the *Torrent became known.
52                 reconciledHandshakeStats bool
53
54                 lastMessageReceived     time.Time
55                 completedHandshake      time.Time
56                 lastUsefulChunkReceived time.Time
57                 lastChunkSent           time.Time
58
59                 // Stuff controlled by the local peer.
60                 needRequestUpdate    string
61                 requestState         request_strategy.PeerRequestState
62                 updateRequestsTimer  *time.Timer
63                 lastRequestUpdate    time.Time
64                 peakRequests         maxRequests
65                 lastBecameInterested time.Time
66                 priorInterest        time.Duration
67
68                 lastStartedExpectingToReceiveChunks time.Time
69                 cumulativeExpectedToReceiveChunks   time.Duration
70                 _chunksReceivedWhileExpecting       int64
71
72                 choking                                bool
73                 piecesReceivedSinceLastRequestUpdate   maxRequests
74                 maxPiecesReceivedBetweenRequestUpdates maxRequests
75                 // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
76                 // and implementation differences, we may receive chunks that are no longer in the set of
77                 // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
78                 validReceiveChunks map[RequestIndex]int
79                 // Indexed by metadata piece, set to true if posted and pending a
80                 // response.
81                 metadataRequests []bool
82                 sentHaves        bitmap.Bitmap
83
84                 // Stuff controlled by the remote peer.
85                 peerInterested        bool
86                 peerChoking           bool
87                 peerRequests          map[Request]*peerRequestState
88                 PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
89                 // The highest possible number of pieces the torrent could have based on
90                 // communication with the peer. Generally only useful until we have the
91                 // torrent info.
92                 peerMinPieces pieceIndex
93                 // Pieces we've accepted chunks for from the peer.
94                 peerTouchedPieces map[pieceIndex]struct{}
95                 peerAllowedFast   typedRoaring.Bitmap[pieceIndex]
96
97                 PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
98
99                 logger log.Logger
100         }
101
102         PeerSource string
103
104         peerRequestState struct {
105                 data             []byte
106                 allocReservation *alloclim.Reservation
107         }
108
109         PeerRemoteAddr interface {
110                 String() string
111         }
112
113         peerRequests = orderedBitmap[RequestIndex]
114 )
115
116 const (
117         PeerSourceUtHolepunch     = "C"
118         PeerSourceTracker         = "Tr"
119         PeerSourceIncoming        = "I"
120         PeerSourceDhtGetPeers     = "Hg" // Peers we found by searching a DHT.
121         PeerSourceDhtAnnouncePeer = "Ha" // Peers that were announced to us by a DHT.
122         PeerSourcePex             = "X"
123         // The peer was given directly, such as through a magnet link.
124         PeerSourceDirect = "M"
125 )
126
127 // Returns the Torrent a Peer belongs to. Shouldn't change for the lifetime of the Peer. May be nil
128 // if we are the receiving end of a connection and the handshake hasn't been received or accepted
129 // yet.
130 func (p *Peer) Torrent() *Torrent {
131         return p.t
132 }
133
134 func (p *Peer) initRequestState() {
135         p.requestState.Requests = &peerRequests{}
136 }
137
138 func (cn *Peer) updateExpectingChunks() {
139         if cn.expectingChunks() {
140                 if cn.lastStartedExpectingToReceiveChunks.IsZero() {
141                         cn.lastStartedExpectingToReceiveChunks = time.Now()
142                 }
143         } else {
144                 if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
145                         cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
146                         cn.lastStartedExpectingToReceiveChunks = time.Time{}
147                 }
148         }
149 }
150
151 func (cn *Peer) expectingChunks() bool {
152         if cn.requestState.Requests.IsEmpty() {
153                 return false
154         }
155         if !cn.requestState.Interested {
156                 return false
157         }
158         if !cn.peerChoking {
159                 return true
160         }
161         haveAllowedFastRequests := false
162         cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
163                 haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
164                         cn.requestState.Requests,
165                         cn.t.pieceRequestIndexOffset(i),
166                         cn.t.pieceRequestIndexOffset(i+1),
167                 ) == 0
168                 return !haveAllowedFastRequests
169         })
170         return haveAllowedFastRequests
171 }
172
173 func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
174         return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
175 }
176
177 func (cn *Peer) cumInterest() time.Duration {
178         ret := cn.priorInterest
179         if cn.requestState.Interested {
180                 ret += time.Since(cn.lastBecameInterested)
181         }
182         return ret
183 }
184
185 func (cn *Peer) locker() *lockWithDeferreds {
186         return cn.t.cl.locker()
187 }
188
189 func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
190         _, ok := cn.PeerExtensionIDs[ext]
191         return ok
192 }
193
194 // The best guess at number of pieces in the torrent for this peer.
195 func (cn *Peer) bestPeerNumPieces() pieceIndex {
196         if cn.t.haveInfo() {
197                 return cn.t.numPieces()
198         }
199         return cn.peerMinPieces
200 }
201
202 func (cn *Peer) completedString() string {
203         have := pieceIndex(cn.peerPieces().GetCardinality())
204         if all, _ := cn.peerHasAllPieces(); all {
205                 have = cn.bestPeerNumPieces()
206         }
207         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
208 }
209
210 func eventAgeString(t time.Time) string {
211         if t.IsZero() {
212                 return "never"
213         }
214         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
215 }
216
217 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
218 func (cn *Peer) statusFlags() (ret string) {
219         c := func(b byte) {
220                 ret += string([]byte{b})
221         }
222         if cn.requestState.Interested {
223                 c('i')
224         }
225         if cn.choking {
226                 c('c')
227         }
228         c(':')
229         ret += cn.connectionFlags()
230         c(':')
231         if cn.peerInterested {
232                 c('i')
233         }
234         if cn.peerChoking {
235                 c('c')
236         }
237         return
238 }
239
240 func (cn *Peer) StatusFlags() string {
241         return cn.statusFlags()
242 }
243
244 func (cn *Peer) downloadRate() float64 {
245         num := cn._stats.BytesReadUsefulData.Int64()
246         if num == 0 {
247                 return 0
248         }
249         return float64(num) / cn.totalExpectingTime().Seconds()
250 }
251
252 func (p *Peer) DownloadRate() float64 {
253         p.locker().RLock()
254         defer p.locker().RUnlock()
255
256         return p.downloadRate()
257 }
258
259 func (cn *Peer) UploadRate() float64 {
260         cn.locker().RLock()
261         defer cn.locker().RUnlock()
262         num := cn._stats.BytesWrittenData.Int64()
263         if num == 0 {
264                 return 0
265         }
266         return float64(num) / time.Now().Sub(cn.completedHandshake).Seconds()
267 }
268
269 func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
270         var last Option[pieceIndex]
271         var count int
272         next := func(item Option[pieceIndex]) {
273                 if item == last {
274                         count++
275                 } else {
276                         if count != 0 {
277                                 f(last.Value, count)
278                         }
279                         last = item
280                         count = 1
281                 }
282         }
283         cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool {
284                 next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
285                 return true
286         })
287         next(None[pieceIndex]())
288 }
289
290 func (cn *Peer) writeStatus(w io.Writer) {
291         // \t isn't preserved in <pre> blocks?
292         if cn.closed.IsSet() {
293                 fmt.Fprint(w, "CLOSED: ")
294         }
295         fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
296         prio, err := cn.peerPriority()
297         prioStr := fmt.Sprintf("%08x", prio)
298         if err != nil {
299                 prioStr += ": " + err.Error()
300         }
301         fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
302         fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
303                 eventAgeString(cn.lastMessageReceived),
304                 eventAgeString(cn.completedHandshake),
305                 eventAgeString(cn.lastHelpful()),
306                 cn.cumInterest(),
307                 cn.totalExpectingTime(),
308         )
309         fmt.Fprintf(w,
310                 "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
311                 cn.completedString(),
312                 len(cn.peerTouchedPieces),
313                 &cn._stats.ChunksReadUseful,
314                 &cn._stats.ChunksRead,
315                 &cn._stats.ChunksWritten,
316                 cn.requestState.Requests.GetCardinality(),
317                 cn.requestState.Cancelled.GetCardinality(),
318                 cn.nominalMaxRequests(),
319                 cn.PeerMaxRequests,
320                 len(cn.peerRequests),
321                 localClientReqq,
322                 cn.statusFlags(),
323                 cn.downloadRate()/(1<<10),
324         )
325         fmt.Fprintf(w, "requested pieces:")
326         cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
327                 fmt.Fprintf(w, " %v(%v)", piece, count)
328         })
329         fmt.Fprintf(w, "\n")
330 }
331
332 func (p *Peer) close() {
333         if !p.closed.Set() {
334                 return
335         }
336         if p.updateRequestsTimer != nil {
337                 p.updateRequestsTimer.Stop()
338         }
339         for _, prs := range p.peerRequests {
340                 prs.allocReservation.Drop()
341         }
342         p.peerImpl.onClose()
343         if p.t != nil {
344                 p.t.decPeerPieceAvailability(p)
345         }
346         for _, f := range p.callbacks.PeerClosed {
347                 f(p)
348         }
349 }
350
351 func (p *Peer) Close() error {
352         p.locker().Lock()
353         defer p.locker().Unlock()
354         p.close()
355         return nil
356 }
357
358 // Peer definitely has a piece, for purposes of requesting. So it's not sufficient that we think
359 // they do (known=true).
360 func (cn *Peer) peerHasPiece(piece pieceIndex) bool {
361         if all, known := cn.peerHasAllPieces(); all && known {
362                 return true
363         }
364         return cn.peerPieces().ContainsInt(piece)
365 }
366
367 // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
368 // https://github.com/pion/datachannel/issues/59 is fixed.
369 const (
370         writeBufferHighWaterLen = 1 << 15
371         writeBufferLowWaterLen  = writeBufferHighWaterLen / 2
372 )
373
374 var (
375         interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary())
376         requestMsgLen    = len(pp.Message{Type: pp.Request}.MustMarshalBinary())
377         // This is the maximum request count that could fit in the write buffer if it's at or below the
378         // low water mark when we run maybeUpdateActualRequestState.
379         maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen
380 )
381
382 // The actual value to use as the maximum outbound requests.
383 func (cn *Peer) nominalMaxRequests() maxRequests {
384         return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
385 }
386
387 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
388         ret = cn.cumulativeExpectedToReceiveChunks
389         if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
390                 ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
391         }
392         return
393 }
394
395 func (cn *Peer) setInterested(interested bool) bool {
396         if cn.requestState.Interested == interested {
397                 return true
398         }
399         cn.requestState.Interested = interested
400         if interested {
401                 cn.lastBecameInterested = time.Now()
402         } else if !cn.lastBecameInterested.IsZero() {
403                 cn.priorInterest += time.Since(cn.lastBecameInterested)
404         }
405         cn.updateExpectingChunks()
406         // log.Printf("%p: setting interest: %v", cn, interested)
407         return cn.writeInterested(interested)
408 }
409
410 // The function takes a message to be sent, and returns true if more messages
411 // are okay.
412 type messageWriter func(pp.Message) bool
413
414 // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
415 // when we want to go fast.
416 func (cn *Peer) shouldRequest(r RequestIndex) error {
417         err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
418         if err != nil {
419                 return err
420         }
421         pi := cn.t.pieceIndexOfRequestIndex(r)
422         if cn.requestState.Cancelled.Contains(r) {
423                 return errors.New("request is cancelled and waiting acknowledgement")
424         }
425         if !cn.peerHasPiece(pi) {
426                 return errors.New("requesting piece peer doesn't have")
427         }
428         if !cn.t.peerIsActive(cn) {
429                 panic("requesting but not in active conns")
430         }
431         if cn.closed.IsSet() {
432                 panic("requesting when connection is closed")
433         }
434         if cn.t.hashingPiece(pi) {
435                 panic("piece is being hashed")
436         }
437         if cn.t.pieceQueuedForHash(pi) {
438                 panic("piece is queued for hash")
439         }
440         if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
441                 // This could occur if we made a request with the fast extension, and then got choked and
442                 // haven't had the request rejected yet.
443                 if !cn.requestState.Requests.Contains(r) {
444                         panic("peer choking and piece not allowed fast")
445                 }
446         }
447         return nil
448 }
449
450 func (cn *Peer) mustRequest(r RequestIndex) bool {
451         more, err := cn.request(r)
452         if err != nil {
453                 panic(err)
454         }
455         return more
456 }
457
458 func (cn *Peer) request(r RequestIndex) (more bool, err error) {
459         if err := cn.shouldRequest(r); err != nil {
460                 panic(err)
461         }
462         if cn.requestState.Requests.Contains(r) {
463                 return true, nil
464         }
465         if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
466                 return true, errors.New("too many outstanding requests")
467         }
468         cn.requestState.Requests.Add(r)
469         if cn.validReceiveChunks == nil {
470                 cn.validReceiveChunks = make(map[RequestIndex]int)
471         }
472         cn.validReceiveChunks[r]++
473         cn.t.requestState[r] = requestState{
474                 peer: cn,
475                 when: time.Now(),
476         }
477         cn.updateExpectingChunks()
478         ppReq := cn.t.requestIndexToRequest(r)
479         for _, f := range cn.callbacks.SentRequest {
480                 f(PeerRequestEvent{cn, ppReq})
481         }
482         return cn.peerImpl._request(ppReq), nil
483 }
484
485 func (me *Peer) cancel(r RequestIndex) {
486         if !me.deleteRequest(r) {
487                 panic("request not existing should have been guarded")
488         }
489         if me._cancel(r) {
490                 // Record that we expect to get a cancel ack.
491                 if !me.requestState.Cancelled.CheckedAdd(r) {
492                         panic("request already cancelled")
493                 }
494         }
495         me.decPeakRequests()
496         if me.isLowOnRequests() {
497                 me.updateRequests("Peer.cancel")
498         }
499 }
500
501 // Sets a reason to update requests, and if there wasn't already one, handle it.
502 func (cn *Peer) updateRequests(reason string) {
503         if cn.needRequestUpdate != "" {
504                 return
505         }
506         cn.needRequestUpdate = reason
507         cn.handleUpdateRequests()
508 }
509
510 // Emits the indices in the Bitmaps bms in order, never repeating any index.
511 // skip is mutated during execution, and its initial values will never be
512 // emitted.
513 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
514         return func(cb iter.Callback) {
515                 for _, bm := range bms {
516                         if !iter.All(
517                                 func(_i interface{}) bool {
518                                         i := _i.(int)
519                                         if skip.Contains(bitmap.BitIndex(i)) {
520                                                 return true
521                                         }
522                                         skip.Add(bitmap.BitIndex(i))
523                                         return cb(i)
524                                 },
525                                 bm.Iter,
526                         ) {
527                                 return
528                         }
529                 }
530         }
531 }
532
533 // After handshake, we know what Torrent and Client stats to include for a
534 // connection.
535 func (cn *Peer) postHandshakeStats(f func(*ConnStats)) {
536         t := cn.t
537         f(&t.stats)
538         f(&t.cl.connStats)
539 }
540
541 // All ConnStats that include this connection. Some objects are not known
542 // until the handshake is complete, after which it's expected to reconcile the
543 // differences.
544 func (cn *Peer) allStats(f func(*ConnStats)) {
545         f(&cn._stats)
546         if cn.reconciledHandshakeStats {
547                 cn.postHandshakeStats(f)
548         }
549 }
550
551 func (cn *Peer) Stats() *ConnStats {
552         return cn.stats()
553 }
554
555 func (cn *Peer) CompletedString() string {
556        return cn.completedString()
557 }
558
559 func (cn *Peer) readBytes(n int64) {
560         cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
561 }
562
563 func (c *Peer) lastHelpful() (ret time.Time) {
564         ret = c.lastUsefulChunkReceived
565         if c.t.seeding() && c.lastChunkSent.After(ret) {
566                 ret = c.lastChunkSent
567         }
568         return
569 }
570
571 // Returns whether any part of the chunk would lie outside a piece of the given length.
572 func chunkOverflowsPiece(cs ChunkSpec, pieceLength pp.Integer) bool {
573         switch {
574         default:
575                 return false
576         case cs.Begin+cs.Length > pieceLength:
577         // Check for integer overflow
578         case cs.Begin > pp.IntegerMax-cs.Length:
579         }
580         return true
581 }
582
583 func runSafeExtraneous(f func()) {
584         if true {
585                 go f()
586         } else {
587                 f()
588         }
589 }
590
591 // Returns true if it was valid to reject the request.
592 func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
593         if c.deleteRequest(r) {
594                 c.decPeakRequests()
595         } else if !c.requestState.Cancelled.CheckedRemove(r) {
596                 return false
597         }
598         if c.isLowOnRequests() {
599                 c.updateRequests("Peer.remoteRejectedRequest")
600         }
601         c.decExpectedChunkReceive(r)
602         return true
603 }
604
605 func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
606         count := c.validReceiveChunks[r]
607         if count == 1 {
608                 delete(c.validReceiveChunks, r)
609         } else if count > 1 {
610                 c.validReceiveChunks[r] = count - 1
611         } else {
612                 panic(r)
613         }
614 }
615
616 func (c *Peer) doChunkReadStats(size int64) {
617         c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) })
618 }
619
620 // Handle a received chunk from a peer.
621 func (c *Peer) receiveChunk(msg *pp.Message) error {
622         chunksReceived.Add("total", 1)
623
624         ppReq := newRequestFromMessage(msg)
625         t := c.t
626         err := t.checkValidReceiveChunk(ppReq)
627         if err != nil {
628                 err = log.WithLevel(log.Warning, err)
629                 return err
630         }
631         req := c.t.requestIndexFromRequest(ppReq)
632
633         recordBlockForSmartBan := sync.OnceFunc(func() {
634                 c.recordBlockForSmartBan(req, msg.Piece)
635         })
636         // This needs to occur before we return, but we try to do it when the client is unlocked. It
637         // can't be done before checking if chunks are valid because they won't be deallocated by piece
638         // hashing if they're out of bounds.
639         defer recordBlockForSmartBan()
640
641         if c.peerChoking {
642                 chunksReceived.Add("while choked", 1)
643         }
644
645         if c.validReceiveChunks[req] <= 0 {
646                 chunksReceived.Add("unexpected", 1)
647                 return errors.New("received unexpected chunk")
648         }
649         c.decExpectedChunkReceive(req)
650
651         if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
652                 chunksReceived.Add("due to allowed fast", 1)
653         }
654
655         // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
656         // have actually already received the piece, while we have the Client unlocked to write the data
657         // out.
658         intended := false
659         {
660                 if c.requestState.Requests.Contains(req) {
661                         for _, f := range c.callbacks.ReceivedRequested {
662                                 f(PeerMessageEvent{c, msg})
663                         }
664                 }
665                 // Request has been satisfied.
666                 if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
667                         intended = true
668                         if !c.peerChoking {
669                                 c._chunksReceivedWhileExpecting++
670                         }
671                         if c.isLowOnRequests() {
672                                 c.updateRequests("Peer.receiveChunk deleted request")
673                         }
674                 } else {
675                         chunksReceived.Add("unintended", 1)
676                 }
677         }
678
679         cl := t.cl
680
681         // Do we actually want this chunk?
682         if t.haveChunk(ppReq) {
683                 // panic(fmt.Sprintf("%+v", ppReq))
684                 chunksReceived.Add("redundant", 1)
685                 c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
686                 return nil
687         }
688
689         piece := &t.pieces[ppReq.Index]
690
691         c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
692         c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
693         if intended {
694                 c.piecesReceivedSinceLastRequestUpdate++
695                 c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
696         }
697         for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
698                 f(ReceivedUsefulDataEvent{c, msg})
699         }
700         c.lastUsefulChunkReceived = time.Now()
701
702         // Need to record that it hasn't been written yet, before we attempt to do
703         // anything with it.
704         piece.incrementPendingWrites()
705         // Record that we have the chunk, so we aren't trying to download it while
706         // waiting for it to be written to storage.
707         piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
708
709         // Cancel pending requests for this chunk from *other* peers.
710         if p := t.requestingPeer(req); p != nil {
711                 if p == c {
712                         panic("should not be pending request from conn that just received it")
713                 }
714                 p.cancel(req)
715         }
716
717         err = func() error {
718                 cl.unlock()
719                 defer cl.lock()
720                 // Opportunistically do this here while we aren't holding the client lock.
721                 recordBlockForSmartBan()
722                 concurrentChunkWrites.Add(1)
723                 defer concurrentChunkWrites.Add(-1)
724                 // Write the chunk out. Note that the upper bound on chunk writing concurrency will be the
725                 // number of connections. We write inline with receiving the chunk (with this lock dance),
726                 // because we want to handle errors synchronously and I haven't thought of a nice way to
727                 // defer any concurrency to the storage and have that notify the client of errors. TODO: Do
728                 // that instead.
729                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
730         }()
731
732         piece.decrementPendingWrites()
733
734         if err != nil {
735                 c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err)
736                 t.pendRequest(req)
737                 // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a
738                 // request update runs while we're writing the chunk that just failed. Then we never do a
739                 // fresh update after pending the failed request.
740                 c.updateRequests("Peer.receiveChunk error writing chunk")
741                 t.onWriteChunkErr(err)
742                 return nil
743         }
744
745         c.onDirtiedPiece(pieceIndex(ppReq.Index))
746
747         // We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
748         if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
749                 t.queuePieceCheck(pieceIndex(ppReq.Index))
750                 // We don't pend all chunks here anymore because we don't want code dependent on the dirty
751                 // chunk status (such as the haveChunk call above) to have to check all the various other
752                 // piece states like queued for hash, hashing etc. This does mean that we need to be sure
753                 // that chunk pieces are pended at an appropriate time later however.
754         }
755
756         cl.event.Broadcast()
757         // We do this because we've written a chunk, and may change PieceState.Partial.
758         t.publishPieceStateChange(pieceIndex(ppReq.Index))
759
760         return nil
761 }
762
763 func (c *Peer) onDirtiedPiece(piece pieceIndex) {
764         if c.peerTouchedPieces == nil {
765                 c.peerTouchedPieces = make(map[pieceIndex]struct{})
766         }
767         c.peerTouchedPieces[piece] = struct{}{}
768         ds := &c.t.pieces[piece].dirtiers
769         if *ds == nil {
770                 *ds = make(map[*Peer]struct{})
771         }
772         (*ds)[c] = struct{}{}
773 }
774
775 func (cn *Peer) netGoodPiecesDirtied() int64 {
776         return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
777 }
778
779 func (c *Peer) peerHasWantedPieces() bool {
780         if all, _ := c.peerHasAllPieces(); all {
781                 return !c.t.haveAllPieces() && !c.t._pendingPieces.IsEmpty()
782         }
783         if !c.t.haveInfo() {
784                 return !c.peerPieces().IsEmpty()
785         }
786         return c.peerPieces().Intersects(&c.t._pendingPieces)
787 }
788
789 // Returns true if an outstanding request is removed. Cancelled requests should be handled
790 // separately.
791 func (c *Peer) deleteRequest(r RequestIndex) bool {
792         if !c.requestState.Requests.CheckedRemove(r) {
793                 return false
794         }
795         for _, f := range c.callbacks.DeletedRequest {
796                 f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
797         }
798         c.updateExpectingChunks()
799         if c.t.requestingPeer(r) != c {
800                 panic("only one peer should have a given request at a time")
801         }
802         delete(c.t.requestState, r)
803         // c.t.iterPeers(func(p *Peer) {
804         //      if p.isLowOnRequests() {
805         //              p.updateRequests("Peer.deleteRequest")
806         //      }
807         // })
808         return true
809 }
810
811 func (c *Peer) deleteAllRequests(reason string) {
812         if c.requestState.Requests.IsEmpty() {
813                 return
814         }
815         c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
816                 if !c.deleteRequest(x) {
817                         panic("request should exist")
818                 }
819                 return true
820         })
821         c.assertNoRequests()
822         c.t.iterPeers(func(p *Peer) {
823                 if p.isLowOnRequests() {
824                         p.updateRequests(reason)
825                 }
826         })
827         return
828 }
829
830 func (c *Peer) assertNoRequests() {
831         if !c.requestState.Requests.IsEmpty() {
832                 panic(c.requestState.Requests.GetCardinality())
833         }
834 }
835
836 func (c *Peer) cancelAllRequests() {
837         c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
838                 c.cancel(x)
839                 return true
840         })
841         c.assertNoRequests()
842         return
843 }
844
845 func (c *Peer) peerPriority() (peerPriority, error) {
846         return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
847 }
848
849 func (c *Peer) remoteIp() net.IP {
850         host, _, _ := net.SplitHostPort(c.RemoteAddr.String())
851         return net.ParseIP(host)
852 }
853
854 func (c *Peer) remoteIpPort() IpPort {
855         ipa, _ := tryIpPortFromNetAddr(c.RemoteAddr)
856         return IpPort{ipa.IP, uint16(ipa.Port)}
857 }
858
859 func (c *Peer) trust() connectionTrust {
860         return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
861 }
862
863 type connectionTrust struct {
864         Implicit            bool
865         NetGoodPiecesDirted int64
866 }
867
868 func (l connectionTrust) Less(r connectionTrust) bool {
869         return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
870 }
871
872 // Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims.
873 func (cn *Peer) newPeerPieces() *roaring.Bitmap {
874         // TODO: Can we use copy on write?
875         ret := cn.peerPieces().Clone()
876         if all, _ := cn.peerHasAllPieces(); all {
877                 if cn.t.haveInfo() {
878                         ret.AddRange(0, bitmap.BitRange(cn.t.numPieces()))
879                 } else {
880                         ret.AddRange(0, bitmap.ToEnd)
881                 }
882         }
883         return ret
884 }
885
886 func (cn *Peer) stats() *ConnStats {
887         return &cn._stats
888 }
889
890 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
891         pc, ok := p.peerImpl.(*PeerConn)
892         return pc, ok
893 }
894
895 func (p *Peer) uncancelledRequests() uint64 {
896         return p.requestState.Requests.GetCardinality()
897 }
898
899 type peerLocalPublicAddr = IpPort
900
901 func (p *Peer) isLowOnRequests() bool {
902         return p.requestState.Requests.IsEmpty() && p.requestState.Cancelled.IsEmpty()
903 }
904
905 func (p *Peer) decPeakRequests() {
906         // // This can occur when peak requests are altered by the update request timer to be lower than
907         // // the actual number of outstanding requests. Let's let it go negative and see what happens. I
908         // // wonder what happens if maxRequests is not signed.
909         // if p.peakRequests < 1 {
910         //      panic(p.peakRequests)
911         // }
912         p.peakRequests--
913 }
914
915 func (p *Peer) recordBlockForSmartBan(req RequestIndex, blockData []byte) {
916         if p.bannableAddr.Ok {
917                 p.t.smartBanCache.RecordBlock(p.bannableAddr.Value, req, blockData)
918         }
919 }