]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
5d78f3902514d06222959e5ba8dd0f60a4e11449
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "math/rand"
12         "net/netip"
13         "net/url"
14         "sort"
15         "strings"
16         "text/tabwriter"
17         "time"
18         "unsafe"
19
20         "github.com/RoaringBitmap/roaring"
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         . "github.com/anacrolix/generics"
25         g "github.com/anacrolix/generics"
26         "github.com/anacrolix/log"
27         "github.com/anacrolix/missinggo/perf"
28         "github.com/anacrolix/missinggo/slices"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pubsub"
32         "github.com/anacrolix/multiless"
33         "github.com/anacrolix/sync"
34         "github.com/pion/datachannel"
35         "golang.org/x/exp/maps"
36
37         "github.com/anacrolix/torrent/bencode"
38         "github.com/anacrolix/torrent/common"
39         "github.com/anacrolix/torrent/internal/check"
40         "github.com/anacrolix/torrent/internal/nestedmaps"
41         "github.com/anacrolix/torrent/metainfo"
42         pp "github.com/anacrolix/torrent/peer_protocol"
43         utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
44         request_strategy "github.com/anacrolix/torrent/request-strategy"
45         "github.com/anacrolix/torrent/segments"
46         "github.com/anacrolix/torrent/storage"
47         "github.com/anacrolix/torrent/tracker"
48         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
49         "github.com/anacrolix/torrent/webseed"
50         "github.com/anacrolix/torrent/webtorrent"
51 )
52
53 // Maintains state of torrent within a Client. Many methods should not be called before the info is
54 // available, see .Info and .GotInfo.
55 type Torrent struct {
56         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
57         // alignment. See #262.
58         stats  ConnStats
59         cl     *Client
60         logger log.Logger
61
62         networkingEnabled      chansync.Flag
63         dataDownloadDisallowed chansync.Flag
64         dataUploadDisallowed   bool
65         userOnWriteChunkErr    func(error)
66
67         closed   chansync.SetOnce
68         onClose  []func()
69         infoHash metainfo.Hash
70         pieces   []Piece
71
72         // The order pieces are requested if there's no stronger reason like availability or priority.
73         pieceRequestOrder []int
74         // Values are the piece indices that changed.
75         pieceStateChanges pubsub.PubSub[PieceStateChange]
76         // The size of chunks to request from peers over the wire. This is
77         // normally 16KiB by convention these days.
78         chunkSize pp.Integer
79         chunkPool sync.Pool
80         // Total length of the torrent in bytes. Stored because it's not O(1) to
81         // get this from the info dict.
82         _length Option[int64]
83
84         // The storage to open when the info dict becomes available.
85         storageOpener *storage.Client
86         // Storage for torrent data.
87         storage *storage.Torrent
88         // Read-locked for using storage, and write-locked for Closing.
89         storageLock sync.RWMutex
90
91         // TODO: Only announce stuff is used?
92         metainfo metainfo.MetaInfo
93
94         // The info dict. nil if we don't have it (yet).
95         info      *metainfo.Info
96         fileIndex segments.Index
97         files     *[]*File
98
99         _chunksPerRegularPiece chunkIndexType
100
101         webSeeds map[string]*Peer
102         // Active peer connections, running message stream loops. TODO: Make this
103         // open (not-closed) connections only.
104         conns               map[*PeerConn]struct{}
105         maxEstablishedConns int
106         // Set of addrs to which we're attempting to connect. Connections are
107         // half-open until all handshakes are completed.
108         halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
109         // The final ess is not silent here as it's in the plural.
110         utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous
111
112         // Reserve of peers to connect to. A peer can be both here and in the
113         // active connections if were told about the peer after connecting with
114         // them. That encourages us to reconnect to peers that are well known in
115         // the swarm.
116         peers prioritizedPeers
117         // Whether we want to know more peers.
118         wantPeersEvent missinggo.Event
119         // An announcer for each tracker URL.
120         trackerAnnouncers map[string]torrentTrackerAnnouncer
121         // How many times we've initiated a DHT announce. TODO: Move into stats.
122         numDHTAnnounces int
123
124         // Name used if the info name isn't available. Should be cleared when the
125         // Info does become available.
126         nameMu      sync.RWMutex
127         displayName string
128
129         // The bencoded bytes of the info dict. This is actively manipulated if
130         // the info bytes aren't initially available, and we try to fetch them
131         // from peers.
132         metadataBytes []byte
133         // Each element corresponds to the 16KiB metadata pieces. If true, we have
134         // received that piece.
135         metadataCompletedChunks []bool
136         metadataChanged         sync.Cond
137
138         // Closed when .Info is obtained.
139         gotMetainfoC chan struct{}
140
141         readers                map[*reader]struct{}
142         _readerNowPieces       bitmap.Bitmap
143         _readerReadaheadPieces bitmap.Bitmap
144
145         // A cache of pieces we need to get. Calculated from various piece and
146         // file priorities and completion states elsewhere.
147         _pendingPieces roaring.Bitmap
148         // A cache of completed piece indices.
149         _completedPieces roaring.Bitmap
150         // Pieces that need to be hashed.
151         piecesQueuedForHash       bitmap.Bitmap
152         activePieceHashes         int
153         initialPieceCheckDisabled bool
154
155         connsWithAllPieces map[*Peer]struct{}
156
157         requestState map[RequestIndex]requestState
158         // Chunks we've written to since the corresponding piece was last checked.
159         dirtyChunks typedRoaring.Bitmap[RequestIndex]
160
161         pex pexState
162
163         // Is On when all pieces are complete.
164         Complete chansync.Flag
165
166         // Torrent sources in use keyed by the source string.
167         activeSources sync.Map
168         sourcesLogger log.Logger
169
170         smartBanCache smartBanCache
171
172         // Large allocations reused between request state updates.
173         requestPieceStates []request_strategy.PieceRequestOrderState
174         requestIndexes     []RequestIndex
175 }
176
177 type outgoingConnAttemptKey = *PeerInfo
178
179 func (t *Torrent) length() int64 {
180         return t._length.Value
181 }
182
183 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
184         // This could be done with roaring.BitSliceIndexing.
185         t.iterPeers(func(peer *Peer) {
186                 if _, ok := t.connsWithAllPieces[peer]; ok {
187                         return
188                 }
189                 if peer.peerHasPiece(i) {
190                         count++
191                 }
192         })
193         return
194 }
195
196 func (t *Torrent) decPieceAvailability(i pieceIndex) {
197         if !t.haveInfo() {
198                 return
199         }
200         p := t.piece(i)
201         if p.relativeAvailability <= 0 {
202                 panic(p.relativeAvailability)
203         }
204         p.relativeAvailability--
205         t.updatePieceRequestOrder(i)
206 }
207
208 func (t *Torrent) incPieceAvailability(i pieceIndex) {
209         // If we don't the info, this should be reconciled when we do.
210         if t.haveInfo() {
211                 p := t.piece(i)
212                 p.relativeAvailability++
213                 t.updatePieceRequestOrder(i)
214         }
215 }
216
217 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
218         return t._readerNowPieces
219 }
220
221 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
222         return t._readerReadaheadPieces
223 }
224
225 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
226         return !t.wantPieceIndex(i)
227 }
228
229 // Returns a channel that is closed when the Torrent is closed.
230 func (t *Torrent) Closed() events.Done {
231         return t.closed.Done()
232 }
233
234 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
235 // pending, and half-open peers.
236 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
237         // Add pending peers to the list
238         t.peers.Each(func(peer PeerInfo) {
239                 ks = append(ks, peer)
240         })
241
242         // Add half-open peers to the list
243         for _, attempts := range t.halfOpen {
244                 for _, peer := range attempts {
245                         ks = append(ks, *peer)
246                 }
247         }
248
249         // Add active peers to the list
250         for conn := range t.conns {
251                 ks = append(ks, PeerInfo{
252                         Id:     conn.PeerID,
253                         Addr:   conn.RemoteAddr,
254                         Source: conn.Discovery,
255                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
256                         // > But if we're not connected to them with an encrypted connection, I couldn't say
257                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
258                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
259                         // > connection. It's probably easiest to do the latter for now.
260                         // https://github.com/anacrolix/torrent/pull/188
261                         SupportsEncryption: conn.headerEncrypted,
262                 })
263         }
264
265         return
266 }
267
268 func (t *Torrent) setChunkSize(size pp.Integer) {
269         t.chunkSize = size
270         t.chunkPool = sync.Pool{
271                 New: func() interface{} {
272                         b := make([]byte, size)
273                         return &b
274                 },
275         }
276 }
277
278 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
279         return t._completedPieces.Contains(bitmap.BitIndex(piece))
280 }
281
282 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
283         if t.storage == nil {
284                 return storage.Completion{Complete: false, Ok: true}
285         }
286         return t.pieces[piece].Storage().Completion()
287 }
288
289 // There's a connection to that address already.
290 func (t *Torrent) addrActive(addr string) bool {
291         if _, ok := t.halfOpen[addr]; ok {
292                 return true
293         }
294         for c := range t.conns {
295                 ra := c.RemoteAddr
296                 if ra.String() == addr {
297                         return true
298                 }
299         }
300         return false
301 }
302
303 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
304         return t.appendConns(ret, func(conn *PeerConn) bool {
305                 return !conn.closed.IsSet()
306         })
307 }
308
309 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
310         for c := range t.conns {
311                 if f(c) {
312                         ret = append(ret, c)
313                 }
314         }
315         return ret
316 }
317
318 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
319         cl := t.cl
320         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
321         if t.closed.IsSet() {
322                 return false
323         }
324         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
325                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
326                         torrent.Add("peers not added because of bad addr", 1)
327                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
328                         return false
329                 }
330         }
331         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
332                 torrent.Add("peers replaced", 1)
333                 if !replaced.equal(p) {
334                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
335                         added = true
336                 }
337         } else {
338                 added = true
339         }
340         t.openNewConns()
341         for t.peers.Len() > cl.config.TorrentPeersHighWater {
342                 _, ok := t.peers.DeleteMin()
343                 if ok {
344                         torrent.Add("excess reserve peers discarded", 1)
345                 }
346         }
347         return
348 }
349
350 func (t *Torrent) invalidateMetadata() {
351         for i := 0; i < len(t.metadataCompletedChunks); i++ {
352                 t.metadataCompletedChunks[i] = false
353         }
354         t.nameMu.Lock()
355         t.gotMetainfoC = make(chan struct{})
356         t.info = nil
357         t.nameMu.Unlock()
358 }
359
360 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
361         if t.haveInfo() {
362                 return
363         }
364         if index >= len(t.metadataCompletedChunks) {
365                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
366                 return
367         }
368         copy(t.metadataBytes[(1<<14)*index:], data)
369         t.metadataCompletedChunks[index] = true
370 }
371
372 func (t *Torrent) metadataPieceCount() int {
373         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
374 }
375
376 func (t *Torrent) haveMetadataPiece(piece int) bool {
377         if t.haveInfo() {
378                 return (1<<14)*piece < len(t.metadataBytes)
379         } else {
380                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
381         }
382 }
383
384 func (t *Torrent) metadataSize() int {
385         return len(t.metadataBytes)
386 }
387
388 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
389         for i := 0; i < len(info.Pieces); i += sha1.Size {
390                 ret = append(ret, info.Pieces[i:i+sha1.Size])
391         }
392         return
393 }
394
395 func (t *Torrent) makePieces() {
396         hashes := infoPieceHashes(t.info)
397         t.pieces = make([]Piece, len(hashes))
398         for i, hash := range hashes {
399                 piece := &t.pieces[i]
400                 piece.t = t
401                 piece.index = pieceIndex(i)
402                 piece.noPendingWrites.L = &piece.pendingWritesMutex
403                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
404                 files := *t.files
405                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
406                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
407                 piece.files = files[beginFile:endFile]
408         }
409 }
410
411 // Returns the index of the first file containing the piece. files must be
412 // ordered by offset.
413 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
414         for i, f := range files {
415                 if f.offset+f.length > pieceOffset {
416                         return i
417                 }
418         }
419         return 0
420 }
421
422 // Returns the index after the last file containing the piece. files must be
423 // ordered by offset.
424 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
425         for i, f := range files {
426                 if f.offset+f.length >= pieceEndOffset {
427                         return i + 1
428                 }
429         }
430         return 0
431 }
432
433 func (t *Torrent) cacheLength() {
434         var l int64
435         for _, f := range t.info.UpvertedFiles() {
436                 l += f.Length
437         }
438         t._length = Some(l)
439 }
440
441 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
442 // separately.
443 func (t *Torrent) setInfo(info *metainfo.Info) error {
444         if err := validateInfo(info); err != nil {
445                 return fmt.Errorf("bad info: %s", err)
446         }
447         if t.storageOpener != nil {
448                 var err error
449                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
450                 if err != nil {
451                         return fmt.Errorf("error opening torrent storage: %s", err)
452                 }
453         }
454         t.nameMu.Lock()
455         t.info = info
456         t.nameMu.Unlock()
457         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
458         t.updateComplete()
459         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
460         t.displayName = "" // Save a few bytes lol.
461         t.initFiles()
462         t.cacheLength()
463         t.makePieces()
464         return nil
465 }
466
467 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
468         return request_strategy.PieceRequestOrderKey{
469                 InfoHash: t.infoHash,
470                 Index:    i,
471         }
472 }
473
474 // This seems to be all the follow-up tasks after info is set, that can't fail.
475 func (t *Torrent) onSetInfo() {
476         t.pieceRequestOrder = rand.Perm(t.numPieces())
477         t.initPieceRequestOrder()
478         MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
479         for i := range t.pieces {
480                 p := &t.pieces[i]
481                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
482                 // being dropped.
483                 if p.relativeAvailability != 0 {
484                         panic(p.relativeAvailability)
485                 }
486                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
487                 t.addRequestOrderPiece(i)
488                 t.updatePieceCompletion(i)
489                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
490                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
491                         t.queuePieceCheck(i)
492                 }
493         }
494         t.cl.event.Broadcast()
495         close(t.gotMetainfoC)
496         t.updateWantPeersEvent()
497         t.requestState = make(map[RequestIndex]requestState)
498         t.tryCreateMorePieceHashers()
499         t.iterPeers(func(p *Peer) {
500                 p.onGotInfo(t.info)
501                 p.updateRequests("onSetInfo")
502         })
503 }
504
505 // Called when metadata for a torrent becomes available.
506 func (t *Torrent) setInfoBytesLocked(b []byte) error {
507         if metainfo.HashBytes(b) != t.infoHash {
508                 return errors.New("info bytes have wrong hash")
509         }
510         var info metainfo.Info
511         if err := bencode.Unmarshal(b, &info); err != nil {
512                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
513         }
514         t.metadataBytes = b
515         t.metadataCompletedChunks = nil
516         if t.info != nil {
517                 return nil
518         }
519         if err := t.setInfo(&info); err != nil {
520                 return err
521         }
522         t.onSetInfo()
523         return nil
524 }
525
526 func (t *Torrent) haveAllMetadataPieces() bool {
527         if t.haveInfo() {
528                 return true
529         }
530         if t.metadataCompletedChunks == nil {
531                 return false
532         }
533         for _, have := range t.metadataCompletedChunks {
534                 if !have {
535                         return false
536                 }
537         }
538         return true
539 }
540
541 // TODO: Propagate errors to disconnect peer.
542 func (t *Torrent) setMetadataSize(size int) (err error) {
543         if t.haveInfo() {
544                 // We already know the correct metadata size.
545                 return
546         }
547         if uint32(size) > maxMetadataSize {
548                 return log.WithLevel(log.Warning, errors.New("bad size"))
549         }
550         if len(t.metadataBytes) == size {
551                 return
552         }
553         t.metadataBytes = make([]byte, size)
554         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
555         t.metadataChanged.Broadcast()
556         for c := range t.conns {
557                 c.requestPendingMetadata()
558         }
559         return
560 }
561
562 // The current working name for the torrent. Either the name in the info dict,
563 // or a display name given such as by the dn value in a magnet link, or "".
564 func (t *Torrent) name() string {
565         t.nameMu.RLock()
566         defer t.nameMu.RUnlock()
567         if t.haveInfo() {
568                 return t.info.BestName()
569         }
570         if t.displayName != "" {
571                 return t.displayName
572         }
573         return "infohash:" + t.infoHash.HexString()
574 }
575
576 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
577         p := &t.pieces[index]
578         ret.Priority = t.piecePriority(index)
579         ret.Completion = p.completion()
580         ret.QueuedForHash = p.queuedForHash()
581         ret.Hashing = p.hashing
582         ret.Checking = ret.QueuedForHash || ret.Hashing
583         ret.Marking = p.marking
584         if !ret.Complete && t.piecePartiallyDownloaded(index) {
585                 ret.Partial = true
586         }
587         return
588 }
589
590 func (t *Torrent) metadataPieceSize(piece int) int {
591         return metadataPieceSize(len(t.metadataBytes), piece)
592 }
593
594 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
595         return pp.Message{
596                 Type:       pp.Extended,
597                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
598                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
599                         Piece:     piece,
600                         TotalSize: len(t.metadataBytes),
601                         Type:      msgType,
602                 }), data...),
603         }
604 }
605
606 type pieceAvailabilityRun struct {
607         Count        pieceIndex
608         Availability int
609 }
610
611 func (me pieceAvailabilityRun) String() string {
612         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
613 }
614
615 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
616         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
617                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
618         })
619         for i := range t.pieces {
620                 rle.Append(t.pieces[i].availability(), 1)
621         }
622         rle.Flush()
623         return
624 }
625
626 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
627         freqs = make([]int, t.numActivePeers()+1)
628         for i := range t.pieces {
629                 freqs[t.piece(i).availability()]++
630         }
631         return
632 }
633
634 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
635         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
636                 ret = append(ret, PieceStateRun{
637                         PieceState: el.(PieceState),
638                         Length:     int(count),
639                 })
640         })
641         for index := range t.pieces {
642                 rle.Append(t.pieceState(pieceIndex(index)), 1)
643         }
644         rle.Flush()
645         return
646 }
647
648 // Produces a small string representing a PieceStateRun.
649 func (psr PieceStateRun) String() (ret string) {
650         ret = fmt.Sprintf("%d", psr.Length)
651         ret += func() string {
652                 switch psr.Priority {
653                 case PiecePriorityNext:
654                         return "N"
655                 case PiecePriorityNormal:
656                         return "."
657                 case PiecePriorityReadahead:
658                         return "R"
659                 case PiecePriorityNow:
660                         return "!"
661                 case PiecePriorityHigh:
662                         return "H"
663                 default:
664                         return ""
665                 }
666         }()
667         if psr.Hashing {
668                 ret += "H"
669         }
670         if psr.QueuedForHash {
671                 ret += "Q"
672         }
673         if psr.Marking {
674                 ret += "M"
675         }
676         if psr.Partial {
677                 ret += "P"
678         }
679         if psr.Complete {
680                 ret += "C"
681         }
682         if !psr.Ok {
683                 ret += "?"
684         }
685         return
686 }
687
688 func (t *Torrent) writeStatus(w io.Writer) {
689         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
690         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
691         if !t.haveInfo() {
692                 fmt.Fprintf(w, "Metadata have: ")
693                 for _, h := range t.metadataCompletedChunks {
694                         fmt.Fprintf(w, "%c", func() rune {
695                                 if h {
696                                         return 'H'
697                                 } else {
698                                         return '.'
699                                 }
700                         }())
701                 }
702                 fmt.Fprintln(w)
703         }
704         fmt.Fprintf(w, "Piece length: %s\n",
705                 func() string {
706                         if t.haveInfo() {
707                                 return fmt.Sprintf("%v (%v chunks)",
708                                         t.usualPieceSize(),
709                                         float64(t.usualPieceSize())/float64(t.chunkSize))
710                         } else {
711                                 return "no info"
712                         }
713                 }(),
714         )
715         if t.info != nil {
716                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
717                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
718                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
719                 // availability frequencies instead.
720                 if false {
721                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
722                                 for _, run := range t.pieceAvailabilityRuns() {
723                                         ret = append(ret, run.String())
724                                 }
725                                 return
726                         }(), " "))
727                 }
728                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
729                         func() (ret []string) {
730                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
731                                         if freq == 0 {
732                                                 continue
733                                         }
734                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
735                                 }
736                                 return
737                         }(),
738                         ", "))
739         }
740         fmt.Fprintf(w, "Reader Pieces:")
741         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
742                 fmt.Fprintf(w, " %d:%d", begin, end)
743                 return true
744         })
745         fmt.Fprintln(w)
746
747         fmt.Fprintf(w, "Enabled trackers:\n")
748         func() {
749                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
750                 fmt.Fprintf(tw, "    URL\tExtra\n")
751                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
752                         lu := l.URL()
753                         ru := r.URL()
754                         var luns, runs url.URL = *lu, *ru
755                         luns.Scheme = ""
756                         runs.Scheme = ""
757                         var ml missinggo.MultiLess
758                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
759                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
760                         return ml.Less()
761                 }).([]torrentTrackerAnnouncer) {
762                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
763                 }
764                 tw.Flush()
765         }()
766
767         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
768
769         dumpStats(w, t.statsLocked())
770
771         fmt.Fprintf(w, "webseeds:\n")
772         t.writePeerStatuses(w, maps.Values(t.webSeeds))
773
774         peerConns := maps.Keys(t.conns)
775         // Peers without priorities first, then those with. I'm undecided about how to order peers
776         // without priorities.
777         sort.Slice(peerConns, func(li, ri int) bool {
778                 l := peerConns[li]
779                 r := peerConns[ri]
780                 ml := multiless.New()
781                 lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
782                 rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
783                 ml = ml.Bool(lpp.Ok, rpp.Ok)
784                 ml = ml.Uint32(rpp.Value, lpp.Value)
785                 return ml.Less()
786         })
787
788         fmt.Fprintf(w, "%v peer conns:\n", len(peerConns))
789         t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
790                 return &pc.Peer
791         }))
792 }
793
794 func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
795         var buf bytes.Buffer
796         for _, c := range peers {
797                 fmt.Fprintf(w, "- ")
798                 buf.Reset()
799                 c.writeStatus(&buf)
800                 w.Write(bytes.TrimRight(
801                         bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n  ")),
802                         " "))
803         }
804 }
805
806 func (t *Torrent) haveInfo() bool {
807         return t.info != nil
808 }
809
810 // Returns a run-time generated MetaInfo that includes the info bytes and
811 // announce-list as currently known to the client.
812 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
813         return metainfo.MetaInfo{
814                 CreationDate: time.Now().Unix(),
815                 Comment:      "dynamic metainfo from client",
816                 CreatedBy:    "go.torrent",
817                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
818                 InfoBytes: func() []byte {
819                         if t.haveInfo() {
820                                 return t.metadataBytes
821                         } else {
822                                 return nil
823                         }
824                 }(),
825                 UrlList: func() []string {
826                         ret := make([]string, 0, len(t.webSeeds))
827                         for url := range t.webSeeds {
828                                 ret = append(ret, url)
829                         }
830                         return ret
831                 }(),
832         }
833 }
834
835 // Get bytes left
836 func (t *Torrent) BytesMissing() (n int64) {
837         t.cl.rLock()
838         n = t.bytesMissingLocked()
839         t.cl.rUnlock()
840         return
841 }
842
843 func (t *Torrent) bytesMissingLocked() int64 {
844         return t.bytesLeft()
845 }
846
847 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
848         roaring.Flip(b, 0, end).Iterate(cb)
849 }
850
851 func (t *Torrent) bytesLeft() (left int64) {
852         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
853                 p := t.piece(pieceIndex(x))
854                 left += int64(p.length() - p.numDirtyBytes())
855                 return true
856         })
857         return
858 }
859
860 // Bytes left to give in tracker announces.
861 func (t *Torrent) bytesLeftAnnounce() int64 {
862         if t.haveInfo() {
863                 return t.bytesLeft()
864         } else {
865                 return -1
866         }
867 }
868
869 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
870         if t.pieceComplete(piece) {
871                 return false
872         }
873         if t.pieceAllDirty(piece) {
874                 return false
875         }
876         return t.pieces[piece].hasDirtyChunks()
877 }
878
879 func (t *Torrent) usualPieceSize() int {
880         return int(t.info.PieceLength)
881 }
882
883 func (t *Torrent) numPieces() pieceIndex {
884         return t.info.NumPieces()
885 }
886
887 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
888         return pieceIndex(t._completedPieces.GetCardinality())
889 }
890
891 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
892         if !t.closed.Set() {
893                 err = errors.New("already closed")
894                 return
895         }
896         for _, f := range t.onClose {
897                 f()
898         }
899         if t.storage != nil {
900                 wg.Add(1)
901                 go func() {
902                         defer wg.Done()
903                         t.storageLock.Lock()
904                         defer t.storageLock.Unlock()
905                         if f := t.storage.Close; f != nil {
906                                 err1 := f()
907                                 if err1 != nil {
908                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
909                                 }
910                         }
911                 }()
912         }
913         t.iterPeers(func(p *Peer) {
914                 p.close()
915         })
916         if t.storage != nil {
917                 t.deletePieceRequestOrder()
918         }
919         t.assertAllPiecesRelativeAvailabilityZero()
920         t.pex.Reset()
921         t.cl.event.Broadcast()
922         t.pieceStateChanges.Close()
923         t.updateWantPeersEvent()
924         return
925 }
926
927 func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
928         for i := range t.pieces {
929                 p := t.piece(i)
930                 if p.relativeAvailability != 0 {
931                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
932                 }
933         }
934 }
935
936 func (t *Torrent) requestOffset(r Request) int64 {
937         return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
938 }
939
940 // Return the request that would include the given offset into the torrent data. Returns !ok if
941 // there is no such request.
942 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
943         return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
944 }
945
946 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
947         defer perf.ScopeTimerErr(&err)()
948         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
949         if err == nil && n != len(data) {
950                 err = io.ErrShortWrite
951         }
952         return err
953 }
954
955 func (t *Torrent) bitfield() (bf []bool) {
956         bf = make([]bool, t.numPieces())
957         t._completedPieces.Iterate(func(piece uint32) (again bool) {
958                 bf[piece] = true
959                 return true
960         })
961         return
962 }
963
964 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
965         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
966 }
967
968 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
969         return t._chunksPerRegularPiece
970 }
971
972 func (t *Torrent) numChunks() RequestIndex {
973         if t.numPieces() == 0 {
974                 return 0
975         }
976         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
977 }
978
979 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
980         t.dirtyChunks.RemoveRange(
981                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
982                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
983 }
984
985 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
986         if t.info.PieceLength == 0 {
987                 // There will be no variance amongst pieces. Only pain.
988                 return 0
989         }
990         if piece == t.numPieces()-1 {
991                 ret := pp.Integer(t.length() % t.info.PieceLength)
992                 if ret != 0 {
993                         return ret
994                 }
995         }
996         return pp.Integer(t.info.PieceLength)
997 }
998
999 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
1000         return &blockCheckingWriter{
1001                 cache:        &t.smartBanCache,
1002                 requestIndex: t.pieceRequestIndexOffset(piece),
1003                 chunkSize:    t.chunkSize.Int(),
1004         }
1005 }
1006
1007 func (t *Torrent) hashPiece(piece pieceIndex) (
1008         ret metainfo.Hash,
1009         // These are peers that sent us blocks that differ from what we hash here.
1010         differingPeers map[bannableAddr]struct{},
1011         err error,
1012 ) {
1013         p := t.piece(piece)
1014         p.waitNoPendingWrites()
1015         storagePiece := t.pieces[piece].Storage()
1016
1017         // Does the backend want to do its own hashing?
1018         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
1019                 var sum metainfo.Hash
1020                 // log.Printf("A piece decided to self-hash: %d", piece)
1021                 sum, err = i.SelfHash()
1022                 missinggo.CopyExact(&ret, sum)
1023                 return
1024         }
1025
1026         hash := pieceHash.New()
1027         const logPieceContents = false
1028         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
1029         writers := []io.Writer{hash, smartBanWriter}
1030         var examineBuf bytes.Buffer
1031         if logPieceContents {
1032                 writers = append(writers, &examineBuf)
1033         }
1034         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
1035         if logPieceContents {
1036                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
1037         }
1038         smartBanWriter.Flush()
1039         differingPeers = smartBanWriter.badPeers
1040         missinggo.CopyExact(&ret, hash.Sum(nil))
1041         return
1042 }
1043
1044 func (t *Torrent) haveAnyPieces() bool {
1045         return !t._completedPieces.IsEmpty()
1046 }
1047
1048 func (t *Torrent) haveAllPieces() bool {
1049         if !t.haveInfo() {
1050                 return false
1051         }
1052         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1053 }
1054
1055 func (t *Torrent) havePiece(index pieceIndex) bool {
1056         return t.haveInfo() && t.pieceComplete(index)
1057 }
1058
1059 func (t *Torrent) maybeDropMutuallyCompletePeer(
1060         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1061         // okay?
1062         p *Peer,
1063 ) {
1064         if !t.cl.config.DropMutuallyCompletePeers {
1065                 return
1066         }
1067         if !t.haveAllPieces() {
1068                 return
1069         }
1070         if all, known := p.peerHasAllPieces(); !(known && all) {
1071                 return
1072         }
1073         if p.useful() {
1074                 return
1075         }
1076         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
1077         p.drop()
1078 }
1079
1080 func (t *Torrent) haveChunk(r Request) (ret bool) {
1081         // defer func() {
1082         //      log.Println("have chunk", r, ret)
1083         // }()
1084         if !t.haveInfo() {
1085                 return false
1086         }
1087         if t.pieceComplete(pieceIndex(r.Index)) {
1088                 return true
1089         }
1090         p := &t.pieces[r.Index]
1091         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1092 }
1093
1094 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1095         return chunkIndexType(cs.Begin / chunkSize)
1096 }
1097
1098 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1099         return t._pendingPieces.Contains(uint32(index))
1100 }
1101
1102 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1103 // conns (which is a map).
1104 var peerConnSlices sync.Pool
1105
1106 func getPeerConnSlice(cap int) []*PeerConn {
1107         getInterface := peerConnSlices.Get()
1108         if getInterface == nil {
1109                 return make([]*PeerConn, 0, cap)
1110         } else {
1111                 return getInterface.([]*PeerConn)[:0]
1112         }
1113 }
1114
1115 // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
1116 // this is a frequent occurrence.
1117 func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
1118         sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
1119         f(sl)
1120         peerConnSlices.Put(sl)
1121 }
1122
1123 func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
1124         wcs := worseConnSlice{conns: sl}
1125         wcs.initKeys(opts)
1126         heap.Init(&wcs)
1127         for wcs.Len() != 0 {
1128                 c := heap.Pop(&wcs).(*PeerConn)
1129                 if opts.incomingIsBad && !c.outgoing {
1130                         return c
1131                 }
1132                 if opts.outgoingIsBad && c.outgoing {
1133                         return c
1134                 }
1135                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1136                         return c
1137                 }
1138                 // If the connection is in the worst half of the established
1139                 // connection quota and is older than a minute.
1140                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1141                         // Give connections 1 minute to prove themselves.
1142                         if time.Since(c.completedHandshake) > time.Minute {
1143                                 return c
1144                         }
1145                 }
1146         }
1147         return nil
1148 }
1149
1150 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1151 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1152 // established connections for more than a minute. This is O(n log n). If there was a way to not
1153 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1154 func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
1155         t.withUnclosedConns(func(ucs []*PeerConn) {
1156                 ret = t.worstBadConnFromSlice(opts, ucs)
1157         })
1158         return
1159 }
1160
1161 type PieceStateChange struct {
1162         Index int
1163         PieceState
1164 }
1165
1166 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1167         t.cl._mu.Defer(func() {
1168                 cur := t.pieceState(piece)
1169                 p := &t.pieces[piece]
1170                 if cur != p.publicPieceState {
1171                         p.publicPieceState = cur
1172                         t.pieceStateChanges.Publish(PieceStateChange{
1173                                 int(piece),
1174                                 cur,
1175                         })
1176                 }
1177         })
1178 }
1179
1180 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1181         if t.pieceComplete(piece) {
1182                 return 0
1183         }
1184         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1185 }
1186
1187 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1188         return t.pieces[piece].allChunksDirty()
1189 }
1190
1191 func (t *Torrent) readersChanged() {
1192         t.updateReaderPieces()
1193         t.updateAllPiecePriorities("Torrent.readersChanged")
1194 }
1195
1196 func (t *Torrent) updateReaderPieces() {
1197         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1198 }
1199
1200 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1201         if from == to {
1202                 return
1203         }
1204         t.updateReaderPieces()
1205         // Order the ranges, high and low.
1206         l, h := from, to
1207         if l.begin > h.begin {
1208                 l, h = h, l
1209         }
1210         if l.end < h.begin {
1211                 // Two distinct ranges.
1212                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1213                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1214         } else {
1215                 // Ranges overlap.
1216                 end := l.end
1217                 if h.end > end {
1218                         end = h.end
1219                 }
1220                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1221         }
1222 }
1223
1224 func (t *Torrent) maybeNewConns() {
1225         // Tickle the accept routine.
1226         t.cl.event.Broadcast()
1227         t.openNewConns()
1228 }
1229
1230 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1231         if t._pendingPieces.Contains(uint32(piece)) {
1232                 t.iterPeers(func(c *Peer) {
1233                         // if c.requestState.Interested {
1234                         //      return
1235                         // }
1236                         if !c.isLowOnRequests() {
1237                                 return
1238                         }
1239                         if !c.peerHasPiece(piece) {
1240                                 return
1241                         }
1242                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1243                                 return
1244                         }
1245                         c.updateRequests(reason)
1246                 })
1247         }
1248         t.maybeNewConns()
1249         t.publishPieceChange(piece)
1250 }
1251
1252 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1253         if !t.closed.IsSet() {
1254                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1255                 // request order.
1256                 t.updatePieceRequestOrder(piece)
1257         }
1258         p := &t.pieces[piece]
1259         newPrio := p.uncachedPriority()
1260         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1261         if newPrio == PiecePriorityNone {
1262                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1263                         return
1264                 }
1265         } else {
1266                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1267                         return
1268                 }
1269         }
1270         t.piecePriorityChanged(piece, reason)
1271 }
1272
1273 func (t *Torrent) updateAllPiecePriorities(reason string) {
1274         t.updatePiecePriorities(0, t.numPieces(), reason)
1275 }
1276
1277 // Update all piece priorities in one hit. This function should have the same
1278 // output as updatePiecePriority, but across all pieces.
1279 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1280         for i := begin; i < end; i++ {
1281                 t.updatePiecePriority(i, reason)
1282         }
1283 }
1284
1285 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1286 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1287         if off >= t.length() {
1288                 return
1289         }
1290         if off < 0 {
1291                 size += off
1292                 off = 0
1293         }
1294         if size <= 0 {
1295                 return
1296         }
1297         begin = pieceIndex(off / t.info.PieceLength)
1298         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1299         if end > pieceIndex(t.info.NumPieces()) {
1300                 end = pieceIndex(t.info.NumPieces())
1301         }
1302         return
1303 }
1304
1305 // Returns true if all iterations complete without breaking. Returns the read regions for all
1306 // readers. The reader regions should not be merged as some callers depend on this method to
1307 // enumerate readers.
1308 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1309         for r := range t.readers {
1310                 p := r.pieces
1311                 if p.begin >= p.end {
1312                         continue
1313                 }
1314                 if !f(p.begin, p.end) {
1315                         return false
1316                 }
1317         }
1318         return true
1319 }
1320
1321 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1322         return t.piece(piece).uncachedPriority()
1323 }
1324
1325 func (t *Torrent) pendRequest(req RequestIndex) {
1326         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1327 }
1328
1329 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1330         t.cl.event.Broadcast()
1331         if t.pieceComplete(piece) {
1332                 t.onPieceCompleted(piece)
1333         } else {
1334                 t.onIncompletePiece(piece)
1335         }
1336         t.updatePiecePriority(piece, reason)
1337 }
1338
1339 func (t *Torrent) numReceivedConns() (ret int) {
1340         for c := range t.conns {
1341                 if c.Discovery == PeerSourceIncoming {
1342                         ret++
1343                 }
1344         }
1345         return
1346 }
1347
1348 func (t *Torrent) numOutgoingConns() (ret int) {
1349         for c := range t.conns {
1350                 if c.outgoing {
1351                         ret++
1352                 }
1353         }
1354         return
1355 }
1356
1357 func (t *Torrent) maxHalfOpen() int {
1358         // Note that if we somehow exceed the maximum established conns, we want
1359         // the negative value to have an effect.
1360         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1361         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1362         // We want to allow some experimentation with new peers, and to try to
1363         // upset an oversupply of received connections.
1364         return int(min(
1365                 max(5, extraIncoming)+establishedHeadroom,
1366                 int64(t.cl.config.HalfOpenConnsPerTorrent),
1367         ))
1368 }
1369
1370 func (t *Torrent) openNewConns() (initiated int) {
1371         defer t.updateWantPeersEvent()
1372         for t.peers.Len() != 0 {
1373                 if !t.wantOutgoingConns() {
1374                         return
1375                 }
1376                 if len(t.halfOpen) >= t.maxHalfOpen() {
1377                         return
1378                 }
1379                 if len(t.cl.dialers) == 0 {
1380                         return
1381                 }
1382                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1383                         return
1384                 }
1385                 p := t.peers.PopMax()
1386                 t.initiateConn(p, false, false, false, false)
1387                 initiated++
1388         }
1389         return
1390 }
1391
1392 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1393         p := t.piece(piece)
1394         uncached := t.pieceCompleteUncached(piece)
1395         cached := p.completion()
1396         changed := cached != uncached
1397         complete := uncached.Complete
1398         p.storageCompletionOk = uncached.Ok
1399         x := uint32(piece)
1400         if complete {
1401                 t._completedPieces.Add(x)
1402                 t.openNewConns()
1403         } else {
1404                 t._completedPieces.Remove(x)
1405         }
1406         p.t.updatePieceRequestOrder(piece)
1407         t.updateComplete()
1408         if complete && len(p.dirtiers) != 0 {
1409                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1410         }
1411         if changed {
1412                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
1413                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1414         }
1415         return changed
1416 }
1417
1418 // Non-blocking read. Client lock is not required.
1419 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1420         for len(b) != 0 {
1421                 p := &t.pieces[off/t.info.PieceLength]
1422                 p.waitNoPendingWrites()
1423                 var n1 int
1424                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1425                 if n1 == 0 {
1426                         break
1427                 }
1428                 off += int64(n1)
1429                 n += n1
1430                 b = b[n1:]
1431         }
1432         return
1433 }
1434
1435 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1436 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1437 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1438 func (t *Torrent) maybeCompleteMetadata() error {
1439         if t.haveInfo() {
1440                 // Nothing to do.
1441                 return nil
1442         }
1443         if !t.haveAllMetadataPieces() {
1444                 // Don't have enough metadata pieces.
1445                 return nil
1446         }
1447         err := t.setInfoBytesLocked(t.metadataBytes)
1448         if err != nil {
1449                 t.invalidateMetadata()
1450                 return fmt.Errorf("error setting info bytes: %s", err)
1451         }
1452         if t.cl.config.Debug {
1453                 t.logger.Printf("%s: got metadata from peers", t)
1454         }
1455         return nil
1456 }
1457
1458 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1459         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1460                 if end > begin {
1461                         now.Add(bitmap.BitIndex(begin))
1462                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1463                 }
1464                 return true
1465         })
1466         return
1467 }
1468
1469 func (t *Torrent) needData() bool {
1470         if t.closed.IsSet() {
1471                 return false
1472         }
1473         if !t.haveInfo() {
1474                 return true
1475         }
1476         return !t._pendingPieces.IsEmpty()
1477 }
1478
1479 func appendMissingStrings(old, new []string) (ret []string) {
1480         ret = old
1481 new:
1482         for _, n := range new {
1483                 for _, o := range old {
1484                         if o == n {
1485                                 continue new
1486                         }
1487                 }
1488                 ret = append(ret, n)
1489         }
1490         return
1491 }
1492
1493 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1494         ret = existing
1495         for minNumTiers > len(ret) {
1496                 ret = append(ret, nil)
1497         }
1498         return
1499 }
1500
1501 func (t *Torrent) addTrackers(announceList [][]string) {
1502         fullAnnounceList := &t.metainfo.AnnounceList
1503         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1504         for tierIndex, trackerURLs := range announceList {
1505                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1506         }
1507         t.startMissingTrackerScrapers()
1508         t.updateWantPeersEvent()
1509 }
1510
1511 // Don't call this before the info is available.
1512 func (t *Torrent) bytesCompleted() int64 {
1513         if !t.haveInfo() {
1514                 return 0
1515         }
1516         return t.length() - t.bytesLeft()
1517 }
1518
1519 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1520         t.cl.lock()
1521         defer t.cl.unlock()
1522         return t.setInfoBytesLocked(b)
1523 }
1524
1525 // Returns true if connection is removed from torrent.Conns.
1526 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1527         if !c.closed.IsSet() {
1528                 panic("connection is not closed")
1529                 // There are behaviours prevented by the closed state that will fail
1530                 // if the connection has been deleted.
1531         }
1532         _, ret = t.conns[c]
1533         delete(t.conns, c)
1534         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1535         // the drop event against the PexConnState instead.
1536         if ret {
1537                 if !t.cl.config.DisablePEX {
1538                         t.pex.Drop(c)
1539                 }
1540         }
1541         torrent.Add("deleted connections", 1)
1542         c.deleteAllRequests("Torrent.deletePeerConn")
1543         t.assertPendingRequests()
1544         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1545                 panic(t.connsWithAllPieces)
1546         }
1547         return
1548 }
1549
1550 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1551         if t.deleteConnWithAllPieces(p) {
1552                 return
1553         }
1554         if !t.haveInfo() {
1555                 return
1556         }
1557         p.peerPieces().Iterate(func(i uint32) bool {
1558                 p.t.decPieceAvailability(pieceIndex(i))
1559                 return true
1560         })
1561 }
1562
1563 func (t *Torrent) assertPendingRequests() {
1564         if !check.Enabled {
1565                 return
1566         }
1567         // var actual pendingRequests
1568         // if t.haveInfo() {
1569         //      actual.m = make([]int, t.numChunks())
1570         // }
1571         // t.iterPeers(func(p *Peer) {
1572         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1573         //              actual.Inc(x)
1574         //              return true
1575         //      })
1576         // })
1577         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1578         // if diff != "" {
1579         //      panic(diff)
1580         // }
1581 }
1582
1583 func (t *Torrent) dropConnection(c *PeerConn) {
1584         t.cl.event.Broadcast()
1585         c.close()
1586         if t.deletePeerConn(c) {
1587                 t.openNewConns()
1588         }
1589 }
1590
1591 // Peers as in contact information for dialing out.
1592 func (t *Torrent) wantPeers() bool {
1593         if t.closed.IsSet() {
1594                 return false
1595         }
1596         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1597                 return false
1598         }
1599         return t.wantOutgoingConns()
1600 }
1601
1602 func (t *Torrent) updateWantPeersEvent() {
1603         if t.wantPeers() {
1604                 t.wantPeersEvent.Set()
1605         } else {
1606                 t.wantPeersEvent.Clear()
1607         }
1608 }
1609
1610 // Returns whether the client should make effort to seed the torrent.
1611 func (t *Torrent) seeding() bool {
1612         cl := t.cl
1613         if t.closed.IsSet() {
1614                 return false
1615         }
1616         if t.dataUploadDisallowed {
1617                 return false
1618         }
1619         if cl.config.NoUpload {
1620                 return false
1621         }
1622         if !cl.config.Seed {
1623                 return false
1624         }
1625         if cl.config.DisableAggressiveUpload && t.needData() {
1626                 return false
1627         }
1628         return true
1629 }
1630
1631 func (t *Torrent) onWebRtcConn(
1632         c datachannel.ReadWriteCloser,
1633         dcc webtorrent.DataChannelContext,
1634 ) {
1635         defer c.Close()
1636         netConn := webrtcNetConn{
1637                 ReadWriteCloser:    c,
1638                 DataChannelContext: dcc,
1639         }
1640         peerRemoteAddr := netConn.RemoteAddr()
1641         //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
1642         if t.cl.badPeerAddr(peerRemoteAddr) {
1643                 return
1644         }
1645         localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
1646         pc, err := t.cl.initiateProtocolHandshakes(
1647                 context.Background(),
1648                 netConn,
1649                 t,
1650                 false,
1651                 newConnectionOpts{
1652                         outgoing:        dcc.LocalOffered,
1653                         remoteAddr:      peerRemoteAddr,
1654                         localPublicAddr: localAddrIpPort,
1655                         network:         webrtcNetwork,
1656                         connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1657                 },
1658         )
1659         if err != nil {
1660                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1661                 return
1662         }
1663         if dcc.LocalOffered {
1664                 pc.Discovery = PeerSourceTracker
1665         } else {
1666                 pc.Discovery = PeerSourceIncoming
1667         }
1668         pc.conn.SetWriteDeadline(time.Time{})
1669         t.cl.lock()
1670         defer t.cl.unlock()
1671         err = t.cl.runHandshookConn(pc, t)
1672         if err != nil {
1673                 t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
1674         }
1675 }
1676
1677 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1678         err := t.cl.runHandshookConn(pc, t)
1679         if err != nil || logAll {
1680                 t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
1681         }
1682 }
1683
1684 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1685         t.logRunHandshookConn(pc, false, log.Debug)
1686 }
1687
1688 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1689         wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
1690         // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
1691         // the same info hash before the old one is cleaned up.
1692         t.onClose = append(t.onClose, release)
1693         wst := websocketTrackerStatus{u, wtc}
1694         go func() {
1695                 err := wtc.Announce(tracker.Started, t.infoHash)
1696                 if err != nil {
1697                         t.logger.WithDefaultLevel(log.Warning).Printf(
1698                                 "error in initial announce to %q: %v",
1699                                 u.String(), err,
1700                         )
1701                 }
1702         }()
1703         return wst
1704 }
1705
1706 func (t *Torrent) startScrapingTracker(_url string) {
1707         if _url == "" {
1708                 return
1709         }
1710         u, err := url.Parse(_url)
1711         if err != nil {
1712                 // URLs with a leading '*' appear to be a uTorrent convention to
1713                 // disable trackers.
1714                 if _url[0] != '*' {
1715                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1716                 }
1717                 return
1718         }
1719         if u.Scheme == "udp" {
1720                 u.Scheme = "udp4"
1721                 t.startScrapingTracker(u.String())
1722                 u.Scheme = "udp6"
1723                 t.startScrapingTracker(u.String())
1724                 return
1725         }
1726         if _, ok := t.trackerAnnouncers[_url]; ok {
1727                 return
1728         }
1729         sl := func() torrentTrackerAnnouncer {
1730                 switch u.Scheme {
1731                 case "ws", "wss":
1732                         if t.cl.config.DisableWebtorrent {
1733                                 return nil
1734                         }
1735                         return t.startWebsocketAnnouncer(*u)
1736                 case "udp4":
1737                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1738                                 return nil
1739                         }
1740                 case "udp6":
1741                         if t.cl.config.DisableIPv6 {
1742                                 return nil
1743                         }
1744                 }
1745                 newAnnouncer := &trackerScraper{
1746                         u:               *u,
1747                         t:               t,
1748                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1749                 }
1750                 go newAnnouncer.Run()
1751                 return newAnnouncer
1752         }()
1753         if sl == nil {
1754                 return
1755         }
1756         if t.trackerAnnouncers == nil {
1757                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1758         }
1759         t.trackerAnnouncers[_url] = sl
1760 }
1761
1762 // Adds and starts tracker scrapers for tracker URLs that aren't already
1763 // running.
1764 func (t *Torrent) startMissingTrackerScrapers() {
1765         if t.cl.config.DisableTrackers {
1766                 return
1767         }
1768         t.startScrapingTracker(t.metainfo.Announce)
1769         for _, tier := range t.metainfo.AnnounceList {
1770                 for _, url := range tier {
1771                         t.startScrapingTracker(url)
1772                 }
1773         }
1774 }
1775
1776 // Returns an AnnounceRequest with fields filled out to defaults and current
1777 // values.
1778 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1779         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1780         // dependent on the network in use.
1781         return tracker.AnnounceRequest{
1782                 Event: event,
1783                 NumWant: func() int32 {
1784                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1785                                 return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
1786                         } else {
1787                                 return 0
1788                         }
1789                 }(),
1790                 Port:     uint16(t.cl.incomingPeerPort()),
1791                 PeerId:   t.cl.peerID,
1792                 InfoHash: t.infoHash,
1793                 Key:      t.cl.announceKey(),
1794
1795                 // The following are vaguely described in BEP 3.
1796
1797                 Left:     t.bytesLeftAnnounce(),
1798                 Uploaded: t.stats.BytesWrittenData.Int64(),
1799                 // There's no mention of wasted or unwanted download in the BEP.
1800                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1801         }
1802 }
1803
1804 // Adds peers revealed in an announce until the announce ends, or we have
1805 // enough peers.
1806 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1807         cl := t.cl
1808         for v := range pvs {
1809                 cl.lock()
1810                 added := 0
1811                 for _, cp := range v.Peers {
1812                         if cp.Port == 0 {
1813                                 // Can't do anything with this.
1814                                 continue
1815                         }
1816                         if t.addPeer(PeerInfo{
1817                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1818                                 Source: PeerSourceDhtGetPeers,
1819                         }) {
1820                                 added++
1821                         }
1822                 }
1823                 cl.unlock()
1824                 // if added != 0 {
1825                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1826                 // }
1827         }
1828 }
1829
1830 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1831 // announce ends. stop will force the announce to end.
1832 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1833         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1834         if err != nil {
1835                 return
1836         }
1837         _done := make(chan struct{})
1838         done = _done
1839         stop = ps.Close
1840         go func() {
1841                 t.consumeDhtAnnouncePeers(ps.Peers())
1842                 close(_done)
1843         }()
1844         return
1845 }
1846
1847 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1848         _, stop, err := t.AnnounceToDht(s)
1849         if err != nil {
1850                 return err
1851         }
1852         select {
1853         case <-t.closed.Done():
1854         case <-time.After(5 * time.Minute):
1855         }
1856         stop()
1857         return nil
1858 }
1859
1860 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1861         cl := t.cl
1862         cl.lock()
1863         defer cl.unlock()
1864         for {
1865                 for {
1866                         if t.closed.IsSet() {
1867                                 return
1868                         }
1869                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1870                         // TODO: We can include the announce_peer step depending on whether we can receive
1871                         // inbound connections. We should probably only announce once every 15 mins too.
1872                         if !t.wantAnyConns() {
1873                                 goto wait
1874                         }
1875                         // TODO: Determine if there's a listener on the port we're announcing.
1876                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1877                                 goto wait
1878                         }
1879                         break
1880                 wait:
1881                         cl.event.Wait()
1882                 }
1883                 func() {
1884                         t.numDHTAnnounces++
1885                         cl.unlock()
1886                         defer cl.lock()
1887                         err := t.timeboxedAnnounceToDht(s)
1888                         if err != nil {
1889                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1890                         }
1891                 }()
1892         }
1893 }
1894
1895 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1896         for _, p := range peers {
1897                 if t.addPeer(p) {
1898                         added++
1899                 }
1900         }
1901         return
1902 }
1903
1904 // The returned TorrentStats may require alignment in memory. See
1905 // https://github.com/anacrolix/torrent/issues/383.
1906 func (t *Torrent) Stats() TorrentStats {
1907         t.cl.rLock()
1908         defer t.cl.rUnlock()
1909         return t.statsLocked()
1910 }
1911
1912 func (t *Torrent) statsLocked() (ret TorrentStats) {
1913         ret.ActivePeers = len(t.conns)
1914         ret.HalfOpenPeers = len(t.halfOpen)
1915         ret.PendingPeers = t.peers.Len()
1916         ret.TotalPeers = t.numTotalPeers()
1917         ret.ConnectedSeeders = 0
1918         for c := range t.conns {
1919                 if all, ok := c.peerHasAllPieces(); all && ok {
1920                         ret.ConnectedSeeders++
1921                 }
1922         }
1923         ret.ConnStats = t.stats.Copy()
1924         ret.PiecesComplete = t.numPiecesCompleted()
1925         return
1926 }
1927
1928 // The total number of peers in the torrent.
1929 func (t *Torrent) numTotalPeers() int {
1930         peers := make(map[string]struct{})
1931         for conn := range t.conns {
1932                 ra := conn.conn.RemoteAddr()
1933                 if ra == nil {
1934                         // It's been closed and doesn't support RemoteAddr.
1935                         continue
1936                 }
1937                 peers[ra.String()] = struct{}{}
1938         }
1939         for addr := range t.halfOpen {
1940                 peers[addr] = struct{}{}
1941         }
1942         t.peers.Each(func(peer PeerInfo) {
1943                 peers[peer.Addr.String()] = struct{}{}
1944         })
1945         return len(peers)
1946 }
1947
1948 // Reconcile bytes transferred before connection was associated with a
1949 // torrent.
1950 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1951         if c._stats != (ConnStats{
1952                 // Handshakes should only increment these fields:
1953                 BytesWritten: c._stats.BytesWritten,
1954                 BytesRead:    c._stats.BytesRead,
1955         }) {
1956                 panic("bad stats")
1957         }
1958         c.postHandshakeStats(func(cs *ConnStats) {
1959                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1960                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1961         })
1962         c.reconciledHandshakeStats = true
1963 }
1964
1965 // Returns true if the connection is added.
1966 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1967         defer func() {
1968                 if err == nil {
1969                         torrent.Add("added connections", 1)
1970                 }
1971         }()
1972         if t.closed.IsSet() {
1973                 return errors.New("torrent closed")
1974         }
1975         for c0 := range t.conns {
1976                 if c.PeerID != c0.PeerID {
1977                         continue
1978                 }
1979                 if !t.cl.config.DropDuplicatePeerIds {
1980                         continue
1981                 }
1982                 if c.hasPreferredNetworkOver(c0) {
1983                         c0.close()
1984                         t.deletePeerConn(c0)
1985                 } else {
1986                         return errors.New("existing connection preferred")
1987                 }
1988         }
1989         if len(t.conns) >= t.maxEstablishedConns {
1990                 numOutgoing := t.numOutgoingConns()
1991                 numIncoming := len(t.conns) - numOutgoing
1992                 c := t.worstBadConn(worseConnLensOpts{
1993                         // We've already established that we have too many connections at this point, so we just
1994                         // need to match what kind we have too many of vs. what we're trying to add now.
1995                         incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
1996                         outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
1997                 })
1998                 if c == nil {
1999                         return errors.New("don't want conn")
2000                 }
2001                 c.close()
2002                 t.deletePeerConn(c)
2003         }
2004         if len(t.conns) >= t.maxEstablishedConns {
2005                 panic(len(t.conns))
2006         }
2007         t.conns[c] = struct{}{}
2008         t.cl.event.Broadcast()
2009         // We'll never receive the "p" extended handshake parameter.
2010         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
2011                 t.pex.Add(c)
2012         }
2013         return nil
2014 }
2015
2016 func (t *Torrent) newConnsAllowed() bool {
2017         if !t.networkingEnabled.Bool() {
2018                 return false
2019         }
2020         if t.closed.IsSet() {
2021                 return false
2022         }
2023         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2024                 return false
2025         }
2026         return true
2027 }
2028
2029 func (t *Torrent) wantAnyConns() bool {
2030         if !t.networkingEnabled.Bool() {
2031                 return false
2032         }
2033         if t.closed.IsSet() {
2034                 return false
2035         }
2036         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2037                 return false
2038         }
2039         return len(t.conns) < t.maxEstablishedConns
2040 }
2041
2042 func (t *Torrent) wantOutgoingConns() bool {
2043         if !t.newConnsAllowed() {
2044                 return false
2045         }
2046         if len(t.conns) < t.maxEstablishedConns {
2047                 return true
2048         }
2049         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2050         return t.worstBadConn(worseConnLensOpts{
2051                 incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
2052                 outgoingIsBad: false,
2053         }) != nil
2054 }
2055
2056 func (t *Torrent) wantIncomingConns() bool {
2057         if !t.newConnsAllowed() {
2058                 return false
2059         }
2060         if len(t.conns) < t.maxEstablishedConns {
2061                 return true
2062         }
2063         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2064         return t.worstBadConn(worseConnLensOpts{
2065                 incomingIsBad: false,
2066                 outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
2067         }) != nil
2068 }
2069
2070 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
2071         t.cl.lock()
2072         defer t.cl.unlock()
2073         oldMax = t.maxEstablishedConns
2074         t.maxEstablishedConns = max
2075         wcs := worseConnSlice{
2076                 conns: t.appendConns(nil, func(*PeerConn) bool {
2077                         return true
2078                 }),
2079         }
2080         wcs.initKeys(worseConnLensOpts{})
2081         heap.Init(&wcs)
2082         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
2083                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
2084         }
2085         t.openNewConns()
2086         return oldMax
2087 }
2088
2089 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
2090         t.logger.LazyLog(log.Debug, func() log.Msg {
2091                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
2092         })
2093         p := t.piece(piece)
2094         p.numVerifies++
2095         t.cl.event.Broadcast()
2096         if t.closed.IsSet() {
2097                 return
2098         }
2099
2100         // Don't score the first time a piece is hashed, it could be an initial check.
2101         if p.storageCompletionOk {
2102                 if passed {
2103                         pieceHashedCorrect.Add(1)
2104                 } else {
2105                         log.Fmsg(
2106                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
2107                         ).AddValues(t, p).LogLevel(
2108
2109                                 log.Debug, t.logger)
2110
2111                         pieceHashedNotCorrect.Add(1)
2112                 }
2113         }
2114
2115         p.marking = true
2116         t.publishPieceChange(piece)
2117         defer func() {
2118                 p.marking = false
2119                 t.publishPieceChange(piece)
2120         }()
2121
2122         if passed {
2123                 if len(p.dirtiers) != 0 {
2124                         // Don't increment stats above connection-level for every involved connection.
2125                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
2126                 }
2127                 for c := range p.dirtiers {
2128                         c._stats.incrementPiecesDirtiedGood()
2129                 }
2130                 t.clearPieceTouchers(piece)
2131                 hasDirty := p.hasDirtyChunks()
2132                 t.cl.unlock()
2133                 if hasDirty {
2134                         p.Flush() // You can be synchronous here!
2135                 }
2136                 err := p.Storage().MarkComplete()
2137                 if err != nil {
2138                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
2139                 }
2140                 t.cl.lock()
2141
2142                 if t.closed.IsSet() {
2143                         return
2144                 }
2145                 t.pendAllChunkSpecs(piece)
2146         } else {
2147                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2148                         // Peers contributed to all the data for this piece hash failure, and the failure was
2149                         // not due to errors in the storage (such as data being dropped in a cache).
2150
2151                         // Increment Torrent and above stats, and then specific connections.
2152                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2153                         for c := range p.dirtiers {
2154                                 // Y u do dis peer?!
2155                                 c.stats().incrementPiecesDirtiedBad()
2156                         }
2157
2158                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2159                         for c := range p.dirtiers {
2160                                 if !c.trusted {
2161                                         bannableTouchers = append(bannableTouchers, c)
2162                                 }
2163                         }
2164                         t.clearPieceTouchers(piece)
2165                         slices.Sort(bannableTouchers, connLessTrusted)
2166
2167                         if t.cl.config.Debug {
2168                                 t.logger.Printf(
2169                                         "bannable conns by trust for piece %d: %v",
2170                                         piece,
2171                                         func() (ret []connectionTrust) {
2172                                                 for _, c := range bannableTouchers {
2173                                                         ret = append(ret, c.trust())
2174                                                 }
2175                                                 return
2176                                         }(),
2177                                 )
2178                         }
2179
2180                         if len(bannableTouchers) >= 1 {
2181                                 c := bannableTouchers[0]
2182                                 if len(bannableTouchers) != 1 {
2183                                         t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2184                                 } else {
2185                                         // Turns out it's still useful to ban peers like this because if there's only a
2186                                         // single peer for a piece, and we never progress that piece to completion, we
2187                                         // will never smart-ban them. Discovered in
2188                                         // https://github.com/anacrolix/torrent/issues/715.
2189                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2190                                         c.ban()
2191                                 }
2192                         }
2193                 }
2194                 t.onIncompletePiece(piece)
2195                 p.Storage().MarkNotComplete()
2196         }
2197         t.updatePieceCompletion(piece)
2198 }
2199
2200 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2201         start := t.pieceRequestIndexOffset(piece)
2202         end := start + t.pieceNumChunks(piece)
2203         for ri := start; ri < end; ri++ {
2204                 t.cancelRequest(ri)
2205         }
2206 }
2207
2208 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2209         t.pendAllChunkSpecs(piece)
2210         t.cancelRequestsForPiece(piece)
2211         t.piece(piece).readerCond.Broadcast()
2212         for conn := range t.conns {
2213                 conn.have(piece)
2214                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
2215         }
2216 }
2217
2218 // Called when a piece is found to be not complete.
2219 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2220         if t.pieceAllDirty(piece) {
2221                 t.pendAllChunkSpecs(piece)
2222         }
2223         if !t.wantPieceIndex(piece) {
2224                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2225                 return
2226         }
2227         // We could drop any connections that we told we have a piece that we
2228         // don't here. But there's a test failure, and it seems clients don't care
2229         // if you request pieces that you already claim to have. Pruning bad
2230         // connections might just remove any connections that aren't treating us
2231         // favourably anyway.
2232
2233         // for c := range t.conns {
2234         //      if c.sentHave(piece) {
2235         //              c.drop()
2236         //      }
2237         // }
2238         t.iterPeers(func(conn *Peer) {
2239                 if conn.peerHasPiece(piece) {
2240                         conn.updateRequests("piece incomplete")
2241                 }
2242         })
2243 }
2244
2245 func (t *Torrent) tryCreateMorePieceHashers() {
2246         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2247         }
2248 }
2249
2250 func (t *Torrent) tryCreatePieceHasher() bool {
2251         if t.storage == nil {
2252                 return false
2253         }
2254         pi, ok := t.getPieceToHash()
2255         if !ok {
2256                 return false
2257         }
2258         p := t.piece(pi)
2259         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2260         p.hashing = true
2261         t.publishPieceChange(pi)
2262         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2263         t.storageLock.RLock()
2264         t.activePieceHashes++
2265         go t.pieceHasher(pi)
2266         return true
2267 }
2268
2269 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2270         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2271                 if t.piece(i).hashing {
2272                         return true
2273                 }
2274                 ret = i
2275                 ok = true
2276                 return false
2277         })
2278         return
2279 }
2280
2281 func (t *Torrent) dropBannedPeers() {
2282         t.iterPeers(func(p *Peer) {
2283                 remoteIp := p.remoteIp()
2284                 if remoteIp == nil {
2285                         if p.bannableAddr.Ok {
2286                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2287                         }
2288                         return
2289                 }
2290                 netipAddr := netip.MustParseAddr(remoteIp.String())
2291                 if Some(netipAddr) != p.bannableAddr {
2292                         t.logger.WithDefaultLevel(log.Debug).Printf(
2293                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2294                                 p, remoteIp, p.bannableAddr)
2295                 }
2296                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2297                         // Should this be a close?
2298                         p.drop()
2299                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2300                 }
2301         })
2302 }
2303
2304 func (t *Torrent) pieceHasher(index pieceIndex) {
2305         p := t.piece(index)
2306         sum, failedPeers, copyErr := t.hashPiece(index)
2307         correct := sum == *p.hash
2308         switch copyErr {
2309         case nil, io.EOF:
2310         default:
2311                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2312         }
2313         t.storageLock.RUnlock()
2314         t.cl.lock()
2315         defer t.cl.unlock()
2316         if correct {
2317                 for peer := range failedPeers {
2318                         t.cl.banPeerIP(peer.AsSlice())
2319                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2320                 }
2321                 t.dropBannedPeers()
2322                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2323                         t.smartBanCache.ForgetBlock(ri)
2324                 }
2325         }
2326         p.hashing = false
2327         t.pieceHashed(index, correct, copyErr)
2328         t.updatePiecePriority(index, "Torrent.pieceHasher")
2329         t.activePieceHashes--
2330         t.tryCreateMorePieceHashers()
2331 }
2332
2333 // Return the connections that touched a piece, and clear the entries while doing it.
2334 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2335         p := t.piece(pi)
2336         for c := range p.dirtiers {
2337                 delete(c.peerTouchedPieces, pi)
2338                 delete(p.dirtiers, c)
2339         }
2340 }
2341
2342 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2343         t.iterPeers(func(p *Peer) {
2344                 ret = append(ret, p)
2345         })
2346         return
2347 }
2348
2349 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2350         piece := t.piece(pieceIndex)
2351         if piece.queuedForHash() {
2352                 return
2353         }
2354         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2355         t.publishPieceChange(pieceIndex)
2356         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2357         t.tryCreateMorePieceHashers()
2358 }
2359
2360 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2361 // before the Info is available.
2362 func (t *Torrent) VerifyData() {
2363         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2364                 t.Piece(i).VerifyData()
2365         }
2366 }
2367
2368 func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
2369         return len(t.halfOpen[addrStr]) != 0
2370 }
2371
2372 func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
2373         addrStr := x.String()
2374         for c := range t.conns {
2375                 ra := c.RemoteAddr
2376                 if ra.String() == addrStr {
2377                         return true
2378                 }
2379         }
2380         return false
2381 }
2382
2383 func (t *Torrent) getHalfOpenPath(
2384         addrStr string,
2385         attemptKey outgoingConnAttemptKey,
2386 ) nestedmaps.Path[*PeerInfo] {
2387         return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey)
2388 }
2389
2390 func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
2391         path := t.getHalfOpenPath(addrStr, attemptKey)
2392         if path.Exists() {
2393                 panic("should be unique")
2394         }
2395         path.Set(attemptKey)
2396         t.cl.numHalfOpen++
2397 }
2398
2399 // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
2400 // sure all the PeerInfo fields are being used.
2401 func (t *Torrent) initiateConn(
2402         peer PeerInfo,
2403         requireRendezvous bool,
2404         skipHolepunchRendezvous bool,
2405         ignoreLimits bool,
2406         receivedHolepunchConnect bool,
2407 ) {
2408         if peer.Id == t.cl.peerID {
2409                 return
2410         }
2411         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2412                 return
2413         }
2414         addr := peer.Addr
2415         addrStr := addr.String()
2416         if !ignoreLimits {
2417                 if t.connectingToPeerAddr(addrStr) {
2418                         return
2419                 }
2420         }
2421         if t.hasPeerConnForAddr(addr) {
2422                 return
2423         }
2424         attemptKey := &peer
2425         t.addHalfOpen(addrStr, attemptKey)
2426         go t.cl.outgoingConnection(
2427                 outgoingConnOpts{
2428                         t:                        t,
2429                         addr:                     peer.Addr,
2430                         requireRendezvous:        requireRendezvous,
2431                         skipHolepunchRendezvous:  skipHolepunchRendezvous,
2432                         receivedHolepunchConnect: receivedHolepunchConnect,
2433                 },
2434                 peer.Source,
2435                 peer.Trusted,
2436                 attemptKey,
2437         )
2438 }
2439
2440 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2441 // quickly make one Client visible to the Torrent of another Client.
2442 func (t *Torrent) AddClientPeer(cl *Client) int {
2443         return t.AddPeers(func() (ps []PeerInfo) {
2444                 for _, la := range cl.ListenAddrs() {
2445                         ps = append(ps, PeerInfo{
2446                                 Addr:    la,
2447                                 Trusted: true,
2448                         })
2449                 }
2450                 return
2451         }())
2452 }
2453
2454 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2455 // connection.
2456 func (t *Torrent) allStats(f func(*ConnStats)) {
2457         f(&t.stats)
2458         f(&t.cl.connStats)
2459 }
2460
2461 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2462         return t.pieces[i].hashing
2463 }
2464
2465 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2466         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2467 }
2468
2469 func (t *Torrent) dialTimeout() time.Duration {
2470         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2471 }
2472
2473 func (t *Torrent) piece(i int) *Piece {
2474         return &t.pieces[i]
2475 }
2476
2477 func (t *Torrent) onWriteChunkErr(err error) {
2478         if t.userOnWriteChunkErr != nil {
2479                 go t.userOnWriteChunkErr(err)
2480                 return
2481         }
2482         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2483         t.disallowDataDownloadLocked()
2484 }
2485
2486 func (t *Torrent) DisallowDataDownload() {
2487         t.disallowDataDownloadLocked()
2488 }
2489
2490 func (t *Torrent) disallowDataDownloadLocked() {
2491         t.dataDownloadDisallowed.Set()
2492 }
2493
2494 func (t *Torrent) AllowDataDownload() {
2495         t.dataDownloadDisallowed.Clear()
2496 }
2497
2498 // Enables uploading data, if it was disabled.
2499 func (t *Torrent) AllowDataUpload() {
2500         t.cl.lock()
2501         defer t.cl.unlock()
2502         t.dataUploadDisallowed = false
2503         for c := range t.conns {
2504                 c.updateRequests("allow data upload")
2505         }
2506 }
2507
2508 // Disables uploading data, if it was enabled.
2509 func (t *Torrent) DisallowDataUpload() {
2510         t.cl.lock()
2511         defer t.cl.unlock()
2512         t.dataUploadDisallowed = true
2513         for c := range t.conns {
2514                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2515                 c.updateRequests("disallow data upload")
2516         }
2517 }
2518
2519 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2520 // or if nil, a critical message is logged, and data download is disabled.
2521 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2522         t.cl.lock()
2523         defer t.cl.unlock()
2524         t.userOnWriteChunkErr = f
2525 }
2526
2527 func (t *Torrent) iterPeers(f func(p *Peer)) {
2528         for pc := range t.conns {
2529                 f(&pc.Peer)
2530         }
2531         for _, ws := range t.webSeeds {
2532                 f(ws)
2533         }
2534 }
2535
2536 func (t *Torrent) callbacks() *Callbacks {
2537         return &t.cl.config.Callbacks
2538 }
2539
2540 type AddWebSeedsOpt func(*webseed.Client)
2541
2542 // Sets the WebSeed trailing path escaper for a webseed.Client.
2543 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2544         return func(c *webseed.Client) {
2545                 c.PathEscaper = custom
2546         }
2547 }
2548
2549 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2550         t.cl.lock()
2551         defer t.cl.unlock()
2552         for _, u := range urls {
2553                 t.addWebSeed(u, opts...)
2554         }
2555 }
2556
2557 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2558         if t.cl.config.DisableWebseeds {
2559                 return
2560         }
2561         if _, ok := t.webSeeds[url]; ok {
2562                 return
2563         }
2564         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2565         // right away. This value should be some multiple of the number of connections to a host. I
2566         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2567         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2568         // "https://webtorrent.io/torrents/".
2569         const maxRequests = 16
2570         ws := webseedPeer{
2571                 peer: Peer{
2572                         t:                        t,
2573                         outgoing:                 true,
2574                         Network:                  "http",
2575                         reconciledHandshakeStats: true,
2576                         // This should affect how often we have to recompute requests for this peer. Note that
2577                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2578                         // requests mark more often, so recomputation is probably sooner than with regular peer
2579                         // conns. ~4x maxRequests would be about right.
2580                         PeerMaxRequests: 128,
2581                         // TODO: Set ban prefix?
2582                         RemoteAddr: remoteAddrFromUrl(url),
2583                         callbacks:  t.callbacks(),
2584                 },
2585                 client: webseed.Client{
2586                         HttpClient: t.cl.httpClient,
2587                         Url:        url,
2588                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2589                                 return &rateLimitedReader{
2590                                         l: t.cl.config.DownloadRateLimiter,
2591                                         r: r,
2592                                 }
2593                         },
2594                 },
2595                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2596         }
2597         ws.peer.initRequestState()
2598         for _, opt := range opts {
2599                 opt(&ws.client)
2600         }
2601         ws.peer.initUpdateRequestsTimer()
2602         ws.requesterCond.L = t.cl.locker()
2603         for i := 0; i < maxRequests; i += 1 {
2604                 go ws.requester(i)
2605         }
2606         for _, f := range t.callbacks().NewPeer {
2607                 f(&ws.peer)
2608         }
2609         ws.peer.logger = t.logger.WithContextValue(&ws)
2610         ws.peer.peerImpl = &ws
2611         if t.haveInfo() {
2612                 ws.onGotInfo(t.info)
2613         }
2614         t.webSeeds[url] = &ws.peer
2615 }
2616
2617 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2618         t.iterPeers(func(p1 *Peer) {
2619                 if p1 == p {
2620                         active = true
2621                 }
2622         })
2623         return
2624 }
2625
2626 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2627         index := t.pieceIndexOfRequestIndex(ri)
2628         return Request{
2629                 pp.Integer(index),
2630                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2631         }
2632 }
2633
2634 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2635         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2636 }
2637
2638 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2639         return RequestIndex(piece) * t.chunksPerRegularPiece()
2640 }
2641
2642 func (t *Torrent) updateComplete() {
2643         t.Complete.SetBool(t.haveAllPieces())
2644 }
2645
2646 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2647         p := t.requestingPeer(r)
2648         if p != nil {
2649                 p.cancel(r)
2650         }
2651         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2652         //delete(t.pendingRequests, r)
2653         if _, ok := t.requestState[r]; ok {
2654                 panic("expected request state to be gone")
2655         }
2656         return p
2657 }
2658
2659 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2660         return t.requestState[r].peer
2661 }
2662
2663 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2664         if t.connsWithAllPieces == nil {
2665                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2666         }
2667         t.connsWithAllPieces[p] = struct{}{}
2668 }
2669
2670 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2671         _, ok := t.connsWithAllPieces[p]
2672         delete(t.connsWithAllPieces, p)
2673         return ok
2674 }
2675
2676 func (t *Torrent) numActivePeers() int {
2677         return len(t.conns) + len(t.webSeeds)
2678 }
2679
2680 func (t *Torrent) hasStorageCap() bool {
2681         f := t.storage.Capacity
2682         if f == nil {
2683                 return false
2684         }
2685         _, ok := (*f)()
2686         return ok
2687 }
2688
2689 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2690         return pieceIndex(ri / t.chunksPerRegularPiece())
2691 }
2692
2693 func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
2694         reuseIter *typedRoaring.Iterator[RequestIndex],
2695         piece pieceIndex,
2696         f func(RequestIndex),
2697 ) {
2698         reuseIter.Initialize(&t.dirtyChunks)
2699         pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
2700         iterBitmapUnsetInRange(
2701                 reuseIter,
2702                 pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
2703                 f,
2704         )
2705 }
2706
2707 type requestState struct {
2708         peer *Peer
2709         when time.Time
2710 }
2711
2712 // Returns an error if a received chunk is out of bounds in someway.
2713 func (t *Torrent) checkValidReceiveChunk(r Request) error {
2714         if !t.haveInfo() {
2715                 return errors.New("torrent missing info")
2716         }
2717         if int(r.Index) >= t.numPieces() {
2718                 return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
2719         }
2720         pieceLength := t.pieceLength(pieceIndex(r.Index))
2721         if r.Begin >= pieceLength {
2722                 return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
2723         }
2724         // We could check chunk lengths here, but chunk request size is not changed often, and tricky
2725         // for peers to manipulate as they need to send potentially large buffers to begin with. There
2726         // should be considerable checks elsewhere for this case due to the network overhead. We should
2727         // catch most of the overflow manipulation stuff by checking index and begin above.
2728         return nil
2729 }
2730
2731 func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
2732         for pc := range t.conns {
2733                 dialAddr, err := pc.remoteDialAddrPort()
2734                 if err != nil {
2735                         continue
2736                 }
2737                 if dialAddr != target {
2738                         continue
2739                 }
2740                 ret = append(ret, pc)
2741         }
2742         return
2743 }
2744
2745 func makeUtHolepunchMsgForPeerConn(
2746         recipient *PeerConn,
2747         msgType utHolepunch.MsgType,
2748         addrPort netip.AddrPort,
2749         errCode utHolepunch.ErrCode,
2750 ) pp.Message {
2751         utHolepunchMsg := utHolepunch.Msg{
2752                 MsgType:  msgType,
2753                 AddrPort: addrPort,
2754                 ErrCode:  errCode,
2755         }
2756         extendedPayload, err := utHolepunchMsg.MarshalBinary()
2757         if err != nil {
2758                 panic(err)
2759         }
2760         return pp.Message{
2761                 Type:            pp.Extended,
2762                 ExtendedID:      MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
2763                 ExtendedPayload: extendedPayload,
2764         }
2765 }
2766
2767 func sendUtHolepunchMsg(
2768         pc *PeerConn,
2769         msgType utHolepunch.MsgType,
2770         addrPort netip.AddrPort,
2771         errCode utHolepunch.ErrCode,
2772 ) {
2773         pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode))
2774 }
2775
2776 func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
2777         switch msg.MsgType {
2778         case utHolepunch.Rendezvous:
2779                 t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
2780                 sendMsg := sendUtHolepunchMsg
2781                 senderAddrPort, err := sender.remoteDialAddrPort()
2782                 if err != nil {
2783                         sender.logger.Levelf(
2784                                 log.Warning,
2785                                 "error getting ut_holepunch rendezvous sender's dial address: %v",
2786                                 err,
2787                         )
2788                         // There's no better error code. The sender's address itself is invalid. I don't see
2789                         // this error message being appropriate anywhere else anyway.
2790                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
2791                 }
2792                 targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
2793                 if len(targets) == 0 {
2794                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
2795                         return nil
2796                 }
2797                 for _, pc := range targets {
2798                         if !pc.supportsExtension(utHolepunch.ExtensionName) {
2799                                 sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
2800                                 continue
2801                         }
2802                         sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
2803                         sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
2804                 }
2805                 return nil
2806         case utHolepunch.Connect:
2807                 t.logger.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort)
2808                 rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
2809                 if ok {
2810                         delete(rz.relays, sender)
2811                         rz.gotConnect.Set()
2812                         rz.relayCond.Broadcast()
2813                 } else {
2814                         // If the rendezvous was removed because we timed out or already got a connect signal,
2815                         // it doesn't hurt to try again.
2816                         t.initiateConn(PeerInfo{
2817                                 Addr:   msg.AddrPort,
2818                                 Source: PeerSourceUtHolepunch,
2819                         }, false, true, true, true)
2820                 }
2821                 return nil
2822         case utHolepunch.Error:
2823                 rz, ok := t.utHolepunchRendezvous[msg.AddrPort]
2824                 if ok {
2825                         delete(rz.relays, sender)
2826                         rz.relayCond.Broadcast()
2827                 }
2828                 t.logger.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
2829                 return nil
2830         default:
2831                 return fmt.Errorf("unhandled msg type %v", msg.MsgType)
2832         }
2833 }
2834
2835 func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) {
2836         if MapContains(t.utHolepunchRendezvous, addrPort) {
2837                 err = errors.New("rendezvous already exists")
2838                 return
2839         }
2840         g.InitNew(&rz)
2841         for pc := range t.conns {
2842                 if !pc.supportsExtension(utHolepunch.ExtensionName) {
2843                         continue
2844                 }
2845                 if pc.supportsExtension(pp.ExtensionNamePex) {
2846                         if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
2847                                 continue
2848                         }
2849                 }
2850                 sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
2851                 MakeMapIfNilAndSet(&rz.relays, pc, struct{}{})
2852         }
2853         if len(rz.relays) == 0 {
2854                 err = fmt.Errorf("no eligible relays")
2855                 return
2856         }
2857         if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) {
2858                 panic("expected to fail earlier if rendezvous already exists")
2859         }
2860         return
2861 }
2862
2863 func (t *Torrent) numHalfOpenAttempts() (num int) {
2864         for _, attempts := range t.halfOpen {
2865                 num += len(attempts)
2866         }
2867         return
2868 }