]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
cmd/torrent download: Propagate write errors
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "math/rand"
12         "net/netip"
13         "net/url"
14         "sort"
15         "strings"
16         "text/tabwriter"
17         "time"
18         "unsafe"
19
20         "github.com/RoaringBitmap/roaring"
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         . "github.com/anacrolix/generics"
25         g "github.com/anacrolix/generics"
26         "github.com/anacrolix/log"
27         "github.com/anacrolix/missinggo/slices"
28         "github.com/anacrolix/missinggo/v2"
29         "github.com/anacrolix/missinggo/v2/bitmap"
30         "github.com/anacrolix/missinggo/v2/pubsub"
31         "github.com/anacrolix/multiless"
32         "github.com/anacrolix/sync"
33         "github.com/pion/datachannel"
34         "golang.org/x/exp/maps"
35
36         "github.com/anacrolix/torrent/bencode"
37         "github.com/anacrolix/torrent/internal/check"
38         "github.com/anacrolix/torrent/internal/nestedmaps"
39         "github.com/anacrolix/torrent/metainfo"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
42         request_strategy "github.com/anacrolix/torrent/request-strategy"
43         "github.com/anacrolix/torrent/storage"
44         "github.com/anacrolix/torrent/tracker"
45         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
46         "github.com/anacrolix/torrent/webseed"
47         "github.com/anacrolix/torrent/webtorrent"
48 )
49
50 // Maintains state of torrent within a Client. Many methods should not be called before the info is
51 // available, see .Info and .GotInfo.
52 type Torrent struct {
53         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
54         // alignment. See #262.
55         stats  ConnStats
56         cl     *Client
57         logger log.Logger
58
59         networkingEnabled      chansync.Flag
60         dataDownloadDisallowed chansync.Flag
61         dataUploadDisallowed   bool
62         userOnWriteChunkErr    func(error)
63
64         closed   chansync.SetOnce
65         onClose  []func()
66         infoHash metainfo.Hash
67         pieces   []Piece
68
69         // The order pieces are requested if there's no stronger reason like availability or priority.
70         pieceRequestOrder []int
71         // Values are the piece indices that changed.
72         pieceStateChanges pubsub.PubSub[PieceStateChange]
73         // The size of chunks to request from peers over the wire. This is
74         // normally 16KiB by convention these days.
75         chunkSize pp.Integer
76         chunkPool sync.Pool
77         // Total length of the torrent in bytes. Stored because it's not O(1) to
78         // get this from the info dict.
79         _length Option[int64]
80
81         // The storage to open when the info dict becomes available.
82         storageOpener *storage.Client
83         // Storage for torrent data.
84         storage *storage.Torrent
85         // Read-locked for using storage, and write-locked for Closing.
86         storageLock sync.RWMutex
87
88         // TODO: Only announce stuff is used?
89         metainfo metainfo.MetaInfo
90
91         // The info dict. nil if we don't have it (yet).
92         info  *metainfo.Info
93         files *[]*File
94
95         _chunksPerRegularPiece chunkIndexType
96
97         webSeeds map[string]*Peer
98         // Active peer connections, running message stream loops. TODO: Make this
99         // open (not-closed) connections only.
100         conns               map[*PeerConn]struct{}
101         maxEstablishedConns int
102         // Set of addrs to which we're attempting to connect. Connections are
103         // half-open until all handshakes are completed.
104         halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
105
106         // Reserve of peers to connect to. A peer can be both here and in the
107         // active connections if were told about the peer after connecting with
108         // them. That encourages us to reconnect to peers that are well known in
109         // the swarm.
110         peers prioritizedPeers
111         // Whether we want to know more peers.
112         wantPeersEvent missinggo.Event
113         // An announcer for each tracker URL.
114         trackerAnnouncers map[string]torrentTrackerAnnouncer
115         // How many times we've initiated a DHT announce. TODO: Move into stats.
116         numDHTAnnounces int
117
118         // Name used if the info name isn't available. Should be cleared when the
119         // Info does become available.
120         nameMu      sync.RWMutex
121         displayName string
122
123         // The bencoded bytes of the info dict. This is actively manipulated if
124         // the info bytes aren't initially available, and we try to fetch them
125         // from peers.
126         metadataBytes []byte
127         // Each element corresponds to the 16KiB metadata pieces. If true, we have
128         // received that piece.
129         metadataCompletedChunks []bool
130         metadataChanged         sync.Cond
131
132         // Closed when .Info is obtained.
133         gotMetainfoC chan struct{}
134
135         readers                map[*reader]struct{}
136         _readerNowPieces       bitmap.Bitmap
137         _readerReadaheadPieces bitmap.Bitmap
138
139         // A cache of pieces we need to get. Calculated from various piece and
140         // file priorities and completion states elsewhere.
141         _pendingPieces roaring.Bitmap
142         // A cache of completed piece indices.
143         _completedPieces roaring.Bitmap
144         // Pieces that need to be hashed.
145         piecesQueuedForHash       bitmap.Bitmap
146         activePieceHashes         int
147         initialPieceCheckDisabled bool
148
149         connsWithAllPieces map[*Peer]struct{}
150
151         requestState map[RequestIndex]requestState
152         // Chunks we've written to since the corresponding piece was last checked.
153         dirtyChunks typedRoaring.Bitmap[RequestIndex]
154
155         pex pexState
156
157         // Is On when all pieces are complete.
158         Complete chansync.Flag
159
160         // Torrent sources in use keyed by the source string.
161         activeSources sync.Map
162         sourcesLogger log.Logger
163
164         smartBanCache smartBanCache
165
166         // Large allocations reused between request state updates.
167         requestPieceStates []request_strategy.PieceRequestOrderState
168         requestIndexes     []RequestIndex
169
170         disableTriggers bool
171 }
172
173 type outgoingConnAttemptKey = *PeerInfo
174
175 func (t *Torrent) length() int64 {
176         return t._length.Value
177 }
178
179 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
180         // This could be done with roaring.BitSliceIndexing.
181         t.iterPeers(func(peer *Peer) {
182                 if _, ok := t.connsWithAllPieces[peer]; ok {
183                         return
184                 }
185                 if peer.peerHasPiece(i) {
186                         count++
187                 }
188         })
189         return
190 }
191
192 func (t *Torrent) decPieceAvailability(i pieceIndex) {
193         if !t.haveInfo() {
194                 return
195         }
196         p := t.piece(i)
197         if p.relativeAvailability <= 0 {
198                 panic(p.relativeAvailability)
199         }
200         p.relativeAvailability--
201         t.updatePieceRequestOrderPiece(i)
202 }
203
204 func (t *Torrent) incPieceAvailability(i pieceIndex) {
205         // If we don't the info, this should be reconciled when we do.
206         if t.haveInfo() {
207                 p := t.piece(i)
208                 p.relativeAvailability++
209                 t.updatePieceRequestOrderPiece(i)
210         }
211 }
212
213 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
214         return t._readerNowPieces
215 }
216
217 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
218         return t._readerReadaheadPieces
219 }
220
221 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
222         return !t.wantPieceIndex(i)
223 }
224
225 // Returns a channel that is closed when the Torrent is closed.
226 func (t *Torrent) Closed() events.Done {
227         return t.closed.Done()
228 }
229
230 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
231 // pending, and half-open peers.
232 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
233         // Add pending peers to the list
234         t.peers.Each(func(peer PeerInfo) {
235                 ks = append(ks, peer)
236         })
237
238         // Add half-open peers to the list
239         for _, attempts := range t.halfOpen {
240                 for _, peer := range attempts {
241                         ks = append(ks, *peer)
242                 }
243         }
244
245         // Add active peers to the list
246         t.cl.rLock()
247         defer t.cl.rUnlock()
248         for conn := range t.conns {
249                 ks = append(ks, PeerInfo{
250                         Id:     conn.PeerID,
251                         Addr:   conn.RemoteAddr,
252                         Source: conn.Discovery,
253                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
254                         // > But if we're not connected to them with an encrypted connection, I couldn't say
255                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
256                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
257                         // > connection. It's probably easiest to do the latter for now.
258                         // https://github.com/anacrolix/torrent/pull/188
259                         SupportsEncryption: conn.headerEncrypted,
260                 })
261         }
262
263         return
264 }
265
266 func (t *Torrent) setChunkSize(size pp.Integer) {
267         t.chunkSize = size
268         t.chunkPool = sync.Pool{
269                 New: func() interface{} {
270                         b := make([]byte, size)
271                         return &b
272                 },
273         }
274 }
275
276 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
277         return t._completedPieces.Contains(bitmap.BitIndex(piece))
278 }
279
280 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
281         if t.storage == nil {
282                 return storage.Completion{Complete: false, Ok: true}
283         }
284         return t.pieces[piece].Storage().Completion()
285 }
286
287 // There's a connection to that address already.
288 func (t *Torrent) addrActive(addr string) bool {
289         if _, ok := t.halfOpen[addr]; ok {
290                 return true
291         }
292         for c := range t.conns {
293                 ra := c.RemoteAddr
294                 if ra.String() == addr {
295                         return true
296                 }
297         }
298         return false
299 }
300
301 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
302         return t.appendConns(ret, func(conn *PeerConn) bool {
303                 return !conn.closed.IsSet()
304         })
305 }
306
307 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
308         for c := range t.conns {
309                 if f(c) {
310                         ret = append(ret, c)
311                 }
312         }
313         return ret
314 }
315
316 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
317         cl := t.cl
318         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
319         if t.closed.IsSet() {
320                 return false
321         }
322         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
323                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
324                         torrent.Add("peers not added because of bad addr", 1)
325                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
326                         return false
327                 }
328         }
329         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
330                 torrent.Add("peers replaced", 1)
331                 if !replaced.equal(p) {
332                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
333                         added = true
334                 }
335         } else {
336                 added = true
337         }
338         t.openNewConns()
339         for t.peers.Len() > cl.config.TorrentPeersHighWater {
340                 _, ok := t.peers.DeleteMin()
341                 if ok {
342                         torrent.Add("excess reserve peers discarded", 1)
343                 }
344         }
345         return
346 }
347
348 func (t *Torrent) invalidateMetadata() {
349         for i := 0; i < len(t.metadataCompletedChunks); i++ {
350                 t.metadataCompletedChunks[i] = false
351         }
352         t.nameMu.Lock()
353         t.gotMetainfoC = make(chan struct{})
354         t.info = nil
355         t.nameMu.Unlock()
356 }
357
358 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
359         if t.haveInfo() {
360                 return
361         }
362         if index >= len(t.metadataCompletedChunks) {
363                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
364                 return
365         }
366         copy(t.metadataBytes[(1<<14)*index:], data)
367         t.metadataCompletedChunks[index] = true
368 }
369
370 func (t *Torrent) metadataPieceCount() int {
371         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
372 }
373
374 func (t *Torrent) haveMetadataPiece(piece int) bool {
375         if t.haveInfo() {
376                 return (1<<14)*piece < len(t.metadataBytes)
377         } else {
378                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
379         }
380 }
381
382 func (t *Torrent) metadataSize() int {
383         return len(t.metadataBytes)
384 }
385
386 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
387         for i := 0; i < len(info.Pieces); i += sha1.Size {
388                 ret = append(ret, info.Pieces[i:i+sha1.Size])
389         }
390         return
391 }
392
393 func (t *Torrent) makePieces() {
394         hashes := infoPieceHashes(t.info)
395         t.pieces = make([]Piece, len(hashes))
396         for i, hash := range hashes {
397                 piece := &t.pieces[i]
398                 piece.t = t
399                 piece.index = pieceIndex(i)
400                 piece.noPendingWrites.L = &piece.pendingWritesMutex
401                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
402                 files := *t.files
403                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
404                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
405                 piece.files = files[beginFile:endFile]
406         }
407 }
408
409 // Returns the index of the first file containing the piece. files must be
410 // ordered by offset.
411 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
412         for i, f := range files {
413                 if f.offset+f.length > pieceOffset {
414                         return i
415                 }
416         }
417         return 0
418 }
419
420 // Returns the index after the last file containing the piece. files must be
421 // ordered by offset.
422 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
423         for i, f := range files {
424                 if f.offset+f.length >= pieceEndOffset {
425                         return i + 1
426                 }
427         }
428         return 0
429 }
430
431 func (t *Torrent) cacheLength() {
432         var l int64
433         for _, f := range t.info.UpvertedFiles() {
434                 l += f.Length
435         }
436         t._length = Some(l)
437 }
438
439 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
440 // separately.
441 func (t *Torrent) setInfo(info *metainfo.Info) error {
442         if err := validateInfo(info); err != nil {
443                 return fmt.Errorf("bad info: %s", err)
444         }
445         if t.storageOpener != nil {
446                 var err error
447                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
448                 if err != nil {
449                         return fmt.Errorf("error opening torrent storage: %s", err)
450                 }
451         }
452         t.nameMu.Lock()
453         t.info = info
454         t.nameMu.Unlock()
455         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
456         t.updateComplete()
457         t.displayName = "" // Save a few bytes lol.
458         t.initFiles()
459         t.cacheLength()
460         t.makePieces()
461         return nil
462 }
463
464 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
465         return request_strategy.PieceRequestOrderKey{
466                 InfoHash: t.infoHash,
467                 Index:    i,
468         }
469 }
470
471 // This seems to be all the follow-up tasks after info is set, that can't fail.
472 func (t *Torrent) onSetInfo() {
473         t.pieceRequestOrder = rand.Perm(t.numPieces())
474         t.initPieceRequestOrder()
475         MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
476         for i := range t.pieces {
477                 p := &t.pieces[i]
478                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
479                 // being dropped.
480                 if p.relativeAvailability != 0 {
481                         panic(p.relativeAvailability)
482                 }
483                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
484                 t.addRequestOrderPiece(i)
485                 t.updatePieceCompletion(i)
486                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
487                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
488                         t.queuePieceCheck(i)
489                 }
490         }
491         t.cl.event.Broadcast()
492         close(t.gotMetainfoC)
493         t.updateWantPeersEvent()
494         t.requestState = make(map[RequestIndex]requestState)
495         t.tryCreateMorePieceHashers()
496         t.iterPeers(func(p *Peer) {
497                 p.onGotInfo(t.info)
498                 p.updateRequests("onSetInfo")
499         })
500 }
501
502 // Called when metadata for a torrent becomes available.
503 func (t *Torrent) setInfoBytesLocked(b []byte) error {
504         if metainfo.HashBytes(b) != t.infoHash {
505                 return errors.New("info bytes have wrong hash")
506         }
507         var info metainfo.Info
508         if err := bencode.Unmarshal(b, &info); err != nil {
509                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
510         }
511         t.metadataBytes = b
512         t.metadataCompletedChunks = nil
513         if t.info != nil {
514                 return nil
515         }
516         if err := t.setInfo(&info); err != nil {
517                 return err
518         }
519         t.onSetInfo()
520         return nil
521 }
522
523 func (t *Torrent) haveAllMetadataPieces() bool {
524         if t.haveInfo() {
525                 return true
526         }
527         if t.metadataCompletedChunks == nil {
528                 return false
529         }
530         for _, have := range t.metadataCompletedChunks {
531                 if !have {
532                         return false
533                 }
534         }
535         return true
536 }
537
538 // TODO: Propagate errors to disconnect peer.
539 func (t *Torrent) setMetadataSize(size int) (err error) {
540         if t.haveInfo() {
541                 // We already know the correct metadata size.
542                 return
543         }
544         if uint32(size) > maxMetadataSize {
545                 return log.WithLevel(log.Warning, errors.New("bad size"))
546         }
547         if len(t.metadataBytes) == size {
548                 return
549         }
550         t.metadataBytes = make([]byte, size)
551         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
552         t.metadataChanged.Broadcast()
553         for c := range t.conns {
554                 c.requestPendingMetadata()
555         }
556         return
557 }
558
559 // The current working name for the torrent. Either the name in the info dict,
560 // or a display name given such as by the dn value in a magnet link, or "".
561 func (t *Torrent) name() string {
562         t.nameMu.RLock()
563         defer t.nameMu.RUnlock()
564         if t.haveInfo() {
565                 return t.info.BestName()
566         }
567         if t.displayName != "" {
568                 return t.displayName
569         }
570         return "infohash:" + t.infoHash.HexString()
571 }
572
573 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
574         p := &t.pieces[index]
575         ret.Priority = t.piecePriority(index)
576         ret.Completion = p.completion()
577         ret.QueuedForHash = p.queuedForHash()
578         ret.Hashing = p.hashing
579         ret.Checking = ret.QueuedForHash || ret.Hashing
580         ret.Marking = p.marking
581         if !ret.Complete && t.piecePartiallyDownloaded(index) {
582                 ret.Partial = true
583         }
584         return
585 }
586
587 func (t *Torrent) metadataPieceSize(piece int) int {
588         return metadataPieceSize(len(t.metadataBytes), piece)
589 }
590
591 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
592         return pp.Message{
593                 Type:       pp.Extended,
594                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
595                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
596                         Piece:     piece,
597                         TotalSize: len(t.metadataBytes),
598                         Type:      msgType,
599                 }), data...),
600         }
601 }
602
603 type pieceAvailabilityRun struct {
604         Count        pieceIndex
605         Availability int
606 }
607
608 func (me pieceAvailabilityRun) String() string {
609         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
610 }
611
612 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
613         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
614                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
615         })
616         for i := range t.pieces {
617                 rle.Append(t.pieces[i].availability(), 1)
618         }
619         rle.Flush()
620         return
621 }
622
623 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
624         freqs = make([]int, t.numActivePeers()+1)
625         for i := range t.pieces {
626                 freqs[t.piece(i).availability()]++
627         }
628         return
629 }
630
631 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
632         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
633                 ret = append(ret, PieceStateRun{
634                         PieceState: el.(PieceState),
635                         Length:     int(count),
636                 })
637         })
638         for index := range t.pieces {
639                 rle.Append(t.pieceState(pieceIndex(index)), 1)
640         }
641         rle.Flush()
642         return
643 }
644
645 // Produces a small string representing a PieceStateRun.
646 func (psr PieceStateRun) String() (ret string) {
647         ret = fmt.Sprintf("%d", psr.Length)
648         ret += func() string {
649                 switch psr.Priority {
650                 case PiecePriorityNext:
651                         return "N"
652                 case PiecePriorityNormal:
653                         return "."
654                 case PiecePriorityReadahead:
655                         return "R"
656                 case PiecePriorityNow:
657                         return "!"
658                 case PiecePriorityHigh:
659                         return "H"
660                 default:
661                         return ""
662                 }
663         }()
664         if psr.Hashing {
665                 ret += "H"
666         }
667         if psr.QueuedForHash {
668                 ret += "Q"
669         }
670         if psr.Marking {
671                 ret += "M"
672         }
673         if psr.Partial {
674                 ret += "P"
675         }
676         if psr.Complete {
677                 ret += "C"
678         }
679         if !psr.Ok {
680                 ret += "?"
681         }
682         return
683 }
684
685 func (t *Torrent) writeStatus(w io.Writer) {
686         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
687         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
688         if !t.haveInfo() {
689                 fmt.Fprintf(w, "Metadata have: ")
690                 for _, h := range t.metadataCompletedChunks {
691                         fmt.Fprintf(w, "%c", func() rune {
692                                 if h {
693                                         return 'H'
694                                 } else {
695                                         return '.'
696                                 }
697                         }())
698                 }
699                 fmt.Fprintln(w)
700         }
701         fmt.Fprintf(w, "Piece length: %s\n",
702                 func() string {
703                         if t.haveInfo() {
704                                 return fmt.Sprintf("%v (%v chunks)",
705                                         t.usualPieceSize(),
706                                         float64(t.usualPieceSize())/float64(t.chunkSize))
707                         } else {
708                                 return "no info"
709                         }
710                 }(),
711         )
712         if t.info != nil {
713                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
714                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
715                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
716                 // availability frequencies instead.
717                 if false {
718                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
719                                 for _, run := range t.pieceAvailabilityRuns() {
720                                         ret = append(ret, run.String())
721                                 }
722                                 return
723                         }(), " "))
724                 }
725                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
726                         func() (ret []string) {
727                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
728                                         if freq == 0 {
729                                                 continue
730                                         }
731                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
732                                 }
733                                 return
734                         }(),
735                         ", "))
736         }
737         fmt.Fprintf(w, "Reader Pieces:")
738         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
739                 fmt.Fprintf(w, " %d:%d", begin, end)
740                 return true
741         })
742         fmt.Fprintln(w)
743
744         fmt.Fprintf(w, "Enabled trackers:\n")
745         func() {
746                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
747                 fmt.Fprintf(tw, "    URL\tExtra\n")
748                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
749                         lu := l.URL()
750                         ru := r.URL()
751                         var luns, runs url.URL = *lu, *ru
752                         luns.Scheme = ""
753                         runs.Scheme = ""
754                         var ml missinggo.MultiLess
755                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
756                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
757                         return ml.Less()
758                 }).([]torrentTrackerAnnouncer) {
759                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
760                 }
761                 tw.Flush()
762         }()
763
764         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
765
766         dumpStats(w, t.statsLocked())
767
768         fmt.Fprintf(w, "webseeds:\n")
769         t.writePeerStatuses(w, maps.Values(t.webSeeds))
770
771         peerConns := maps.Keys(t.conns)
772         // Peers without priorities first, then those with. I'm undecided about how to order peers
773         // without priorities.
774         sort.Slice(peerConns, func(li, ri int) bool {
775                 l := peerConns[li]
776                 r := peerConns[ri]
777                 ml := multiless.New()
778                 lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
779                 rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
780                 ml = ml.Bool(lpp.Ok, rpp.Ok)
781                 ml = ml.Uint32(rpp.Value, lpp.Value)
782                 return ml.Less()
783         })
784
785         fmt.Fprintf(w, "%v peer conns:\n", len(peerConns))
786         t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
787                 return &pc.Peer
788         }))
789 }
790
791 func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
792         var buf bytes.Buffer
793         for _, c := range peers {
794                 fmt.Fprintf(w, "- ")
795                 buf.Reset()
796                 c.writeStatus(&buf)
797                 w.Write(bytes.TrimRight(
798                         bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n  ")),
799                         " "))
800         }
801 }
802
803 func (t *Torrent) haveInfo() bool {
804         return t.info != nil
805 }
806
807 // Returns a run-time generated MetaInfo that includes the info bytes and
808 // announce-list as currently known to the client.
809 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
810         return metainfo.MetaInfo{
811                 CreationDate: time.Now().Unix(),
812                 Comment:      "dynamic metainfo from client",
813                 CreatedBy:    "go.torrent",
814                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
815                 InfoBytes: func() []byte {
816                         if t.haveInfo() {
817                                 return t.metadataBytes
818                         } else {
819                                 return nil
820                         }
821                 }(),
822                 UrlList: func() []string {
823                         ret := make([]string, 0, len(t.webSeeds))
824                         for url := range t.webSeeds {
825                                 ret = append(ret, url)
826                         }
827                         return ret
828                 }(),
829         }
830 }
831
832 // Returns a count of bytes that are not complete in storage, and not pending being written to
833 // storage. This value is from the perspective of the download manager, and may not agree with the
834 // actual state in storage. If you want read data synchronously you should use a Reader. See
835 // https://github.com/anacrolix/torrent/issues/828.
836 func (t *Torrent) BytesMissing() (n int64) {
837         t.cl.rLock()
838         n = t.bytesMissingLocked()
839         t.cl.rUnlock()
840         return
841 }
842
843 func (t *Torrent) bytesMissingLocked() int64 {
844         return t.bytesLeft()
845 }
846
847 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
848         roaring.Flip(b, 0, end).Iterate(cb)
849 }
850
851 func (t *Torrent) bytesLeft() (left int64) {
852         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
853                 p := t.piece(pieceIndex(x))
854                 left += int64(p.length() - p.numDirtyBytes())
855                 return true
856         })
857         return
858 }
859
860 // Bytes left to give in tracker announces.
861 func (t *Torrent) bytesLeftAnnounce() int64 {
862         if t.haveInfo() {
863                 return t.bytesLeft()
864         } else {
865                 return -1
866         }
867 }
868
869 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
870         if t.pieceComplete(piece) {
871                 return false
872         }
873         if t.pieceAllDirty(piece) {
874                 return false
875         }
876         return t.pieces[piece].hasDirtyChunks()
877 }
878
879 func (t *Torrent) usualPieceSize() int {
880         return int(t.info.PieceLength)
881 }
882
883 func (t *Torrent) numPieces() pieceIndex {
884         return t.info.NumPieces()
885 }
886
887 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
888         return pieceIndex(t._completedPieces.GetCardinality())
889 }
890
891 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
892         if !t.closed.Set() {
893                 err = errors.New("already closed")
894                 return
895         }
896         for _, f := range t.onClose {
897                 f()
898         }
899         if t.storage != nil {
900                 wg.Add(1)
901                 go func() {
902                         defer wg.Done()
903                         t.storageLock.Lock()
904                         defer t.storageLock.Unlock()
905                         if f := t.storage.Close; f != nil {
906                                 err1 := f()
907                                 if err1 != nil {
908                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
909                                 }
910                         }
911                 }()
912         }
913         t.iterPeers(func(p *Peer) {
914                 p.close()
915         })
916         if t.storage != nil {
917                 t.deletePieceRequestOrder()
918         }
919         t.assertAllPiecesRelativeAvailabilityZero()
920         t.pex.Reset()
921         t.cl.event.Broadcast()
922         t.pieceStateChanges.Close()
923         t.updateWantPeersEvent()
924         return
925 }
926
927 func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
928         for i := range t.pieces {
929                 p := t.piece(i)
930                 if p.relativeAvailability != 0 {
931                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
932                 }
933         }
934 }
935
936 func (t *Torrent) requestOffset(r Request) int64 {
937         return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
938 }
939
940 // Return the request that would include the given offset into the torrent data. Returns !ok if
941 // there is no such request.
942 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
943         return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
944 }
945
946 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
947         //defer perf.ScopeTimerErr(&err)()
948         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
949         if err == nil && n != len(data) {
950                 err = io.ErrShortWrite
951         }
952         return err
953 }
954
955 func (t *Torrent) bitfield() (bf []bool) {
956         bf = make([]bool, t.numPieces())
957         t._completedPieces.Iterate(func(piece uint32) (again bool) {
958                 bf[piece] = true
959                 return true
960         })
961         return
962 }
963
964 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
965         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
966 }
967
968 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
969         return t._chunksPerRegularPiece
970 }
971
972 func (t *Torrent) numChunks() RequestIndex {
973         if t.numPieces() == 0 {
974                 return 0
975         }
976         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
977 }
978
979 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
980         t.dirtyChunks.RemoveRange(
981                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
982                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
983 }
984
985 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
986         if t.info.PieceLength == 0 {
987                 // There will be no variance amongst pieces. Only pain.
988                 return 0
989         }
990         if piece == t.numPieces()-1 {
991                 ret := pp.Integer(t.length() % t.info.PieceLength)
992                 if ret != 0 {
993                         return ret
994                 }
995         }
996         return pp.Integer(t.info.PieceLength)
997 }
998
999 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
1000         return &blockCheckingWriter{
1001                 cache:        &t.smartBanCache,
1002                 requestIndex: t.pieceRequestIndexOffset(piece),
1003                 chunkSize:    t.chunkSize.Int(),
1004         }
1005 }
1006
1007 func (t *Torrent) hashPiece(piece pieceIndex) (
1008         ret metainfo.Hash,
1009         // These are peers that sent us blocks that differ from what we hash here.
1010         differingPeers map[bannableAddr]struct{},
1011         err error,
1012 ) {
1013         p := t.piece(piece)
1014         p.waitNoPendingWrites()
1015         storagePiece := t.pieces[piece].Storage()
1016
1017         // Does the backend want to do its own hashing?
1018         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
1019                 var sum metainfo.Hash
1020                 // log.Printf("A piece decided to self-hash: %d", piece)
1021                 sum, err = i.SelfHash()
1022                 missinggo.CopyExact(&ret, sum)
1023                 return
1024         }
1025
1026         hash := pieceHash.New()
1027         const logPieceContents = false
1028         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
1029         writers := []io.Writer{hash, smartBanWriter}
1030         var examineBuf bytes.Buffer
1031         if logPieceContents {
1032                 writers = append(writers, &examineBuf)
1033         }
1034         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
1035         if logPieceContents {
1036                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
1037         }
1038         smartBanWriter.Flush()
1039         differingPeers = smartBanWriter.badPeers
1040         missinggo.CopyExact(&ret, hash.Sum(nil))
1041         return
1042 }
1043
1044 func (t *Torrent) haveAnyPieces() bool {
1045         return !t._completedPieces.IsEmpty()
1046 }
1047
1048 func (t *Torrent) haveAllPieces() bool {
1049         if !t.haveInfo() {
1050                 return false
1051         }
1052         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1053 }
1054
1055 func (t *Torrent) havePiece(index pieceIndex) bool {
1056         return t.haveInfo() && t.pieceComplete(index)
1057 }
1058
1059 func (t *Torrent) maybeDropMutuallyCompletePeer(
1060         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1061         // okay?
1062         p *PeerConn,
1063 ) {
1064         if !t.cl.config.DropMutuallyCompletePeers {
1065                 return
1066         }
1067         if !t.haveAllPieces() {
1068                 return
1069         }
1070         if all, known := p.peerHasAllPieces(); !(known && all) {
1071                 return
1072         }
1073         if p.useful() {
1074                 return
1075         }
1076         p.logger.Levelf(log.Debug, "is mutually complete; dropping")
1077         p.drop()
1078 }
1079
1080 func (t *Torrent) haveChunk(r Request) (ret bool) {
1081         // defer func() {
1082         //      log.Println("have chunk", r, ret)
1083         // }()
1084         if !t.haveInfo() {
1085                 return false
1086         }
1087         if t.pieceComplete(pieceIndex(r.Index)) {
1088                 return true
1089         }
1090         p := &t.pieces[r.Index]
1091         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1092 }
1093
1094 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1095         return chunkIndexType(cs.Begin / chunkSize)
1096 }
1097
1098 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1099         return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
1100 }
1101
1102 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1103 // conns (which is a map).
1104 var peerConnSlices sync.Pool
1105
1106 func getPeerConnSlice(cap int) []*PeerConn {
1107         getInterface := peerConnSlices.Get()
1108         if getInterface == nil {
1109                 return make([]*PeerConn, 0, cap)
1110         } else {
1111                 return getInterface.([]*PeerConn)[:0]
1112         }
1113 }
1114
1115 // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
1116 // this is a frequent occurrence.
1117 func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
1118         sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
1119         f(sl)
1120         peerConnSlices.Put(sl)
1121 }
1122
1123 func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
1124         wcs := worseConnSlice{conns: sl}
1125         wcs.initKeys(opts)
1126         heap.Init(&wcs)
1127         for wcs.Len() != 0 {
1128                 c := heap.Pop(&wcs).(*PeerConn)
1129                 if opts.incomingIsBad && !c.outgoing {
1130                         return c
1131                 }
1132                 if opts.outgoingIsBad && c.outgoing {
1133                         return c
1134                 }
1135                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1136                         return c
1137                 }
1138                 // If the connection is in the worst half of the established
1139                 // connection quota and is older than a minute.
1140                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1141                         // Give connections 1 minute to prove themselves.
1142                         if time.Since(c.completedHandshake) > time.Minute {
1143                                 return c
1144                         }
1145                 }
1146         }
1147         return nil
1148 }
1149
1150 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1151 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1152 // established connections for more than a minute. This is O(n log n). If there was a way to not
1153 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1154 func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
1155         t.withUnclosedConns(func(ucs []*PeerConn) {
1156                 ret = t.worstBadConnFromSlice(opts, ucs)
1157         })
1158         return
1159 }
1160
1161 type PieceStateChange struct {
1162         Index int
1163         PieceState
1164 }
1165
1166 func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
1167         t.cl._mu.Defer(func() {
1168                 cur := t.pieceState(piece)
1169                 p := &t.pieces[piece]
1170                 if cur != p.publicPieceState {
1171                         p.publicPieceState = cur
1172                         t.pieceStateChanges.Publish(PieceStateChange{
1173                                 int(piece),
1174                                 cur,
1175                         })
1176                 }
1177         })
1178 }
1179
1180 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1181         if t.pieceComplete(piece) {
1182                 return 0
1183         }
1184         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1185 }
1186
1187 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1188         return t.pieces[piece].allChunksDirty()
1189 }
1190
1191 func (t *Torrent) readersChanged() {
1192         t.updateReaderPieces()
1193         t.updateAllPiecePriorities("Torrent.readersChanged")
1194 }
1195
1196 func (t *Torrent) updateReaderPieces() {
1197         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1198 }
1199
1200 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1201         if from == to {
1202                 return
1203         }
1204         t.updateReaderPieces()
1205         // Order the ranges, high and low.
1206         l, h := from, to
1207         if l.begin > h.begin {
1208                 l, h = h, l
1209         }
1210         if l.end < h.begin {
1211                 // Two distinct ranges.
1212                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1213                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1214         } else {
1215                 // Ranges overlap.
1216                 end := l.end
1217                 if h.end > end {
1218                         end = h.end
1219                 }
1220                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1221         }
1222 }
1223
1224 func (t *Torrent) maybeNewConns() {
1225         // Tickle the accept routine.
1226         t.cl.event.Broadcast()
1227         t.openNewConns()
1228 }
1229
1230 func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string) {
1231         if t._pendingPieces.Contains(uint32(piece)) {
1232                 t.iterPeers(func(c *Peer) {
1233                         // if c.requestState.Interested {
1234                         //      return
1235                         // }
1236                         if !c.isLowOnRequests() {
1237                                 return
1238                         }
1239                         if !c.peerHasPiece(piece) {
1240                                 return
1241                         }
1242                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1243                                 return
1244                         }
1245                         c.updateRequests(reason)
1246                 })
1247         }
1248         t.maybeNewConns()
1249         t.publishPieceStateChange(piece)
1250 }
1251
1252 func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
1253         if !t.closed.IsSet() {
1254                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1255                 // request order.
1256                 t.updatePieceRequestOrderPiece(piece)
1257         }
1258         p := &t.pieces[piece]
1259         newPrio := p.uncachedPriority()
1260         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1261         if newPrio == PiecePriorityNone {
1262                 return t._pendingPieces.CheckedRemove(uint32(piece))
1263         } else {
1264                 return t._pendingPieces.CheckedAdd(uint32(piece))
1265         }
1266 }
1267
1268 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1269         if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
1270                 t.onPiecePendingTriggers(piece, reason)
1271         }
1272         t.updatePieceRequestOrderPiece(piece)
1273 }
1274
1275 func (t *Torrent) updateAllPiecePriorities(reason string) {
1276         t.updatePiecePriorities(0, t.numPieces(), reason)
1277 }
1278
1279 // Update all piece priorities in one hit. This function should have the same
1280 // output as updatePiecePriority, but across all pieces.
1281 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1282         for i := begin; i < end; i++ {
1283                 t.updatePiecePriority(i, reason)
1284         }
1285 }
1286
1287 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1288 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1289         if off >= t.length() {
1290                 return
1291         }
1292         if off < 0 {
1293                 size += off
1294                 off = 0
1295         }
1296         if size <= 0 {
1297                 return
1298         }
1299         begin = pieceIndex(off / t.info.PieceLength)
1300         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1301         if end > pieceIndex(t.info.NumPieces()) {
1302                 end = pieceIndex(t.info.NumPieces())
1303         }
1304         return
1305 }
1306
1307 // Returns true if all iterations complete without breaking. Returns the read regions for all
1308 // readers. The reader regions should not be merged as some callers depend on this method to
1309 // enumerate readers.
1310 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1311         for r := range t.readers {
1312                 p := r.pieces
1313                 if p.begin >= p.end {
1314                         continue
1315                 }
1316                 if !f(p.begin, p.end) {
1317                         return false
1318                 }
1319         }
1320         return true
1321 }
1322
1323 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1324         return t.piece(piece).uncachedPriority()
1325 }
1326
1327 func (t *Torrent) pendRequest(req RequestIndex) {
1328         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1329 }
1330
1331 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1332         t.cl.event.Broadcast()
1333         if t.pieceComplete(piece) {
1334                 t.onPieceCompleted(piece)
1335         } else {
1336                 t.onIncompletePiece(piece)
1337         }
1338         t.updatePiecePriority(piece, reason)
1339 }
1340
1341 func (t *Torrent) numReceivedConns() (ret int) {
1342         for c := range t.conns {
1343                 if c.Discovery == PeerSourceIncoming {
1344                         ret++
1345                 }
1346         }
1347         return
1348 }
1349
1350 func (t *Torrent) numOutgoingConns() (ret int) {
1351         for c := range t.conns {
1352                 if c.outgoing {
1353                         ret++
1354                 }
1355         }
1356         return
1357 }
1358
1359 func (t *Torrent) maxHalfOpen() int {
1360         // Note that if we somehow exceed the maximum established conns, we want
1361         // the negative value to have an effect.
1362         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1363         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1364         // We want to allow some experimentation with new peers, and to try to
1365         // upset an oversupply of received connections.
1366         return int(min(
1367                 max(5, extraIncoming)+establishedHeadroom,
1368                 int64(t.cl.config.HalfOpenConnsPerTorrent),
1369         ))
1370 }
1371
1372 func (t *Torrent) openNewConns() (initiated int) {
1373         defer t.updateWantPeersEvent()
1374         for t.peers.Len() != 0 {
1375                 if !t.wantOutgoingConns() {
1376                         return
1377                 }
1378                 if len(t.halfOpen) >= t.maxHalfOpen() {
1379                         return
1380                 }
1381                 if len(t.cl.dialers) == 0 {
1382                         return
1383                 }
1384                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1385                         return
1386                 }
1387                 p := t.peers.PopMax()
1388                 opts := outgoingConnOpts{
1389                         peerInfo:                 p,
1390                         t:                        t,
1391                         requireRendezvous:        false,
1392                         skipHolepunchRendezvous:  false,
1393                         receivedHolepunchConnect: false,
1394                         HeaderObfuscationPolicy:  t.cl.config.HeaderObfuscationPolicy,
1395                 }
1396                 initiateConn(opts, false)
1397                 initiated++
1398         }
1399         return
1400 }
1401
1402 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1403         p := t.piece(piece)
1404         uncached := t.pieceCompleteUncached(piece)
1405         cached := p.completion()
1406         changed := cached != uncached
1407         complete := uncached.Complete
1408         p.storageCompletionOk = uncached.Ok
1409         x := uint32(piece)
1410         if complete {
1411                 t._completedPieces.Add(x)
1412                 t.openNewConns()
1413         } else {
1414                 t._completedPieces.Remove(x)
1415         }
1416         p.t.updatePieceRequestOrderPiece(piece)
1417         t.updateComplete()
1418         if complete && len(p.dirtiers) != 0 {
1419                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1420         }
1421         if changed {
1422                 //slog.Debug(
1423                 //      "piece completion changed",
1424                 //      slog.Int("piece", piece),
1425                 //      slog.Any("from", cached),
1426                 //      slog.Any("to", uncached))
1427                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1428         }
1429         return changed
1430 }
1431
1432 // Non-blocking read. Client lock is not required.
1433 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1434         for len(b) != 0 {
1435                 p := &t.pieces[off/t.info.PieceLength]
1436                 p.waitNoPendingWrites()
1437                 var n1 int
1438                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1439                 if n1 == 0 {
1440                         break
1441                 }
1442                 off += int64(n1)
1443                 n += n1
1444                 b = b[n1:]
1445         }
1446         return
1447 }
1448
1449 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1450 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1451 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1452 func (t *Torrent) maybeCompleteMetadata() error {
1453         if t.haveInfo() {
1454                 // Nothing to do.
1455                 return nil
1456         }
1457         if !t.haveAllMetadataPieces() {
1458                 // Don't have enough metadata pieces.
1459                 return nil
1460         }
1461         err := t.setInfoBytesLocked(t.metadataBytes)
1462         if err != nil {
1463                 t.invalidateMetadata()
1464                 return fmt.Errorf("error setting info bytes: %s", err)
1465         }
1466         if t.cl.config.Debug {
1467                 t.logger.Printf("%s: got metadata from peers", t)
1468         }
1469         return nil
1470 }
1471
1472 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1473         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1474                 if end > begin {
1475                         now.Add(bitmap.BitIndex(begin))
1476                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1477                 }
1478                 return true
1479         })
1480         return
1481 }
1482
1483 func (t *Torrent) needData() bool {
1484         if t.closed.IsSet() {
1485                 return false
1486         }
1487         if !t.haveInfo() {
1488                 return true
1489         }
1490         return !t._pendingPieces.IsEmpty()
1491 }
1492
1493 func appendMissingStrings(old, new []string) (ret []string) {
1494         ret = old
1495 new:
1496         for _, n := range new {
1497                 for _, o := range old {
1498                         if o == n {
1499                                 continue new
1500                         }
1501                 }
1502                 ret = append(ret, n)
1503         }
1504         return
1505 }
1506
1507 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1508         ret = existing
1509         for minNumTiers > len(ret) {
1510                 ret = append(ret, nil)
1511         }
1512         return
1513 }
1514
1515 func (t *Torrent) addTrackers(announceList [][]string) {
1516         fullAnnounceList := &t.metainfo.AnnounceList
1517         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1518         for tierIndex, trackerURLs := range announceList {
1519                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1520         }
1521         t.startMissingTrackerScrapers()
1522         t.updateWantPeersEvent()
1523 }
1524
1525 // Don't call this before the info is available.
1526 func (t *Torrent) bytesCompleted() int64 {
1527         if !t.haveInfo() {
1528                 return 0
1529         }
1530         return t.length() - t.bytesLeft()
1531 }
1532
1533 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1534         t.cl.lock()
1535         defer t.cl.unlock()
1536         return t.setInfoBytesLocked(b)
1537 }
1538
1539 // Returns true if connection is removed from torrent.Conns.
1540 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1541         if !c.closed.IsSet() {
1542                 panic("connection is not closed")
1543                 // There are behaviours prevented by the closed state that will fail
1544                 // if the connection has been deleted.
1545         }
1546         _, ret = t.conns[c]
1547         delete(t.conns, c)
1548         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1549         // the drop event against the PexConnState instead.
1550         if ret {
1551                 if !t.cl.config.DisablePEX {
1552                         t.pex.Drop(c)
1553                 }
1554         }
1555         torrent.Add("deleted connections", 1)
1556         c.deleteAllRequests("Torrent.deletePeerConn")
1557         t.assertPendingRequests()
1558         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1559                 panic(t.connsWithAllPieces)
1560         }
1561         return
1562 }
1563
1564 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1565         if t.deleteConnWithAllPieces(p) {
1566                 return
1567         }
1568         if !t.haveInfo() {
1569                 return
1570         }
1571         p.peerPieces().Iterate(func(i uint32) bool {
1572                 p.t.decPieceAvailability(pieceIndex(i))
1573                 return true
1574         })
1575 }
1576
1577 func (t *Torrent) assertPendingRequests() {
1578         if !check.Enabled {
1579                 return
1580         }
1581         // var actual pendingRequests
1582         // if t.haveInfo() {
1583         //      actual.m = make([]int, t.numChunks())
1584         // }
1585         // t.iterPeers(func(p *Peer) {
1586         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1587         //              actual.Inc(x)
1588         //              return true
1589         //      })
1590         // })
1591         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1592         // if diff != "" {
1593         //      panic(diff)
1594         // }
1595 }
1596
1597 func (t *Torrent) dropConnection(c *PeerConn) {
1598         t.cl.event.Broadcast()
1599         c.close()
1600         if t.deletePeerConn(c) {
1601                 t.openNewConns()
1602         }
1603 }
1604
1605 // Peers as in contact information for dialing out.
1606 func (t *Torrent) wantPeers() bool {
1607         if t.closed.IsSet() {
1608                 return false
1609         }
1610         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1611                 return false
1612         }
1613         return t.wantOutgoingConns()
1614 }
1615
1616 func (t *Torrent) updateWantPeersEvent() {
1617         if t.wantPeers() {
1618                 t.wantPeersEvent.Set()
1619         } else {
1620                 t.wantPeersEvent.Clear()
1621         }
1622 }
1623
1624 // Returns whether the client should make effort to seed the torrent.
1625 func (t *Torrent) seeding() bool {
1626         cl := t.cl
1627         if t.closed.IsSet() {
1628                 return false
1629         }
1630         if t.dataUploadDisallowed {
1631                 return false
1632         }
1633         if cl.config.NoUpload {
1634                 return false
1635         }
1636         if !cl.config.Seed {
1637                 return false
1638         }
1639         if cl.config.DisableAggressiveUpload && t.needData() {
1640                 return false
1641         }
1642         return true
1643 }
1644
1645 func (t *Torrent) onWebRtcConn(
1646         c datachannel.ReadWriteCloser,
1647         dcc webtorrent.DataChannelContext,
1648 ) {
1649         defer c.Close()
1650         netConn := webrtcNetConn{
1651                 ReadWriteCloser:    c,
1652                 DataChannelContext: dcc,
1653         }
1654         peerRemoteAddr := netConn.RemoteAddr()
1655         //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
1656         if t.cl.badPeerAddr(peerRemoteAddr) {
1657                 return
1658         }
1659         localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
1660         pc, err := t.cl.initiateProtocolHandshakes(
1661                 context.Background(),
1662                 netConn,
1663                 t,
1664                 false,
1665                 newConnectionOpts{
1666                         outgoing:        dcc.LocalOffered,
1667                         remoteAddr:      peerRemoteAddr,
1668                         localPublicAddr: localAddrIpPort,
1669                         network:         webrtcNetwork,
1670                         connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1671                 },
1672         )
1673         if err != nil {
1674                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1675                 return
1676         }
1677         if dcc.LocalOffered {
1678                 pc.Discovery = PeerSourceTracker
1679         } else {
1680                 pc.Discovery = PeerSourceIncoming
1681         }
1682         pc.conn.SetWriteDeadline(time.Time{})
1683         t.cl.lock()
1684         defer t.cl.unlock()
1685         err = t.runHandshookConn(pc)
1686         if err != nil {
1687                 t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
1688         }
1689 }
1690
1691 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1692         err := t.runHandshookConn(pc)
1693         if err != nil || logAll {
1694                 t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
1695         }
1696 }
1697
1698 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1699         t.logRunHandshookConn(pc, false, log.Debug)
1700 }
1701
1702 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1703         wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
1704         // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
1705         // the same info hash before the old one is cleaned up.
1706         t.onClose = append(t.onClose, release)
1707         wst := websocketTrackerStatus{u, wtc}
1708         go func() {
1709                 err := wtc.Announce(tracker.Started, t.infoHash)
1710                 if err != nil {
1711                         t.logger.WithDefaultLevel(log.Warning).Printf(
1712                                 "error in initial announce to %q: %v",
1713                                 u.String(), err,
1714                         )
1715                 }
1716         }()
1717         return wst
1718 }
1719
1720 func (t *Torrent) startScrapingTracker(_url string) {
1721         if _url == "" {
1722                 return
1723         }
1724         u, err := url.Parse(_url)
1725         if err != nil {
1726                 // URLs with a leading '*' appear to be a uTorrent convention to disable trackers.
1727                 if _url[0] != '*' {
1728                         t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err)
1729                 }
1730                 return
1731         }
1732         if u.Scheme == "udp" {
1733                 u.Scheme = "udp4"
1734                 t.startScrapingTracker(u.String())
1735                 u.Scheme = "udp6"
1736                 t.startScrapingTracker(u.String())
1737                 return
1738         }
1739         if _, ok := t.trackerAnnouncers[_url]; ok {
1740                 return
1741         }
1742         sl := func() torrentTrackerAnnouncer {
1743                 switch u.Scheme {
1744                 case "ws", "wss":
1745                         if t.cl.config.DisableWebtorrent {
1746                                 return nil
1747                         }
1748                         return t.startWebsocketAnnouncer(*u)
1749                 case "udp4":
1750                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1751                                 return nil
1752                         }
1753                 case "udp6":
1754                         if t.cl.config.DisableIPv6 {
1755                                 return nil
1756                         }
1757                 }
1758                 newAnnouncer := &trackerScraper{
1759                         u:               *u,
1760                         t:               t,
1761                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1762                 }
1763                 go newAnnouncer.Run()
1764                 return newAnnouncer
1765         }()
1766         if sl == nil {
1767                 return
1768         }
1769         if t.trackerAnnouncers == nil {
1770                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1771         }
1772         t.trackerAnnouncers[_url] = sl
1773 }
1774
1775 // Adds and starts tracker scrapers for tracker URLs that aren't already
1776 // running.
1777 func (t *Torrent) startMissingTrackerScrapers() {
1778         if t.cl.config.DisableTrackers {
1779                 return
1780         }
1781         t.startScrapingTracker(t.metainfo.Announce)
1782         for _, tier := range t.metainfo.AnnounceList {
1783                 for _, url := range tier {
1784                         t.startScrapingTracker(url)
1785                 }
1786         }
1787 }
1788
1789 // Returns an AnnounceRequest with fields filled out to defaults and current
1790 // values.
1791 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1792         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1793         // dependent on the network in use.
1794         return tracker.AnnounceRequest{
1795                 Event: event,
1796                 NumWant: func() int32 {
1797                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1798                                 return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
1799                         } else {
1800                                 return 0
1801                         }
1802                 }(),
1803                 Port:     uint16(t.cl.incomingPeerPort()),
1804                 PeerId:   t.cl.peerID,
1805                 InfoHash: t.infoHash,
1806                 Key:      t.cl.announceKey(),
1807
1808                 // The following are vaguely described in BEP 3.
1809
1810                 Left:     t.bytesLeftAnnounce(),
1811                 Uploaded: t.stats.BytesWrittenData.Int64(),
1812                 // There's no mention of wasted or unwanted download in the BEP.
1813                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1814         }
1815 }
1816
1817 // Adds peers revealed in an announce until the announce ends, or we have
1818 // enough peers.
1819 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1820         cl := t.cl
1821         for v := range pvs {
1822                 cl.lock()
1823                 added := 0
1824                 for _, cp := range v.Peers {
1825                         if cp.Port == 0 {
1826                                 // Can't do anything with this.
1827                                 continue
1828                         }
1829                         if t.addPeer(PeerInfo{
1830                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1831                                 Source: PeerSourceDhtGetPeers,
1832                         }) {
1833                                 added++
1834                         }
1835                 }
1836                 cl.unlock()
1837                 // if added != 0 {
1838                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1839                 // }
1840         }
1841 }
1842
1843 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1844 // announce ends. stop will force the announce to end.
1845 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1846         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1847         if err != nil {
1848                 return
1849         }
1850         _done := make(chan struct{})
1851         done = _done
1852         stop = ps.Close
1853         go func() {
1854                 t.consumeDhtAnnouncePeers(ps.Peers())
1855                 close(_done)
1856         }()
1857         return
1858 }
1859
1860 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1861         _, stop, err := t.AnnounceToDht(s)
1862         if err != nil {
1863                 return err
1864         }
1865         select {
1866         case <-t.closed.Done():
1867         case <-time.After(5 * time.Minute):
1868         }
1869         stop()
1870         return nil
1871 }
1872
1873 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1874         cl := t.cl
1875         cl.lock()
1876         defer cl.unlock()
1877         for {
1878                 for {
1879                         if t.closed.IsSet() {
1880                                 return
1881                         }
1882                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1883                         // TODO: We can include the announce_peer step depending on whether we can receive
1884                         // inbound connections. We should probably only announce once every 15 mins too.
1885                         if !t.wantAnyConns() {
1886                                 goto wait
1887                         }
1888                         // TODO: Determine if there's a listener on the port we're announcing.
1889                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1890                                 goto wait
1891                         }
1892                         break
1893                 wait:
1894                         cl.event.Wait()
1895                 }
1896                 func() {
1897                         t.numDHTAnnounces++
1898                         cl.unlock()
1899                         defer cl.lock()
1900                         err := t.timeboxedAnnounceToDht(s)
1901                         if err != nil {
1902                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1903                         }
1904                 }()
1905         }
1906 }
1907
1908 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1909         for _, p := range peers {
1910                 if t.addPeer(p) {
1911                         added++
1912                 }
1913         }
1914         return
1915 }
1916
1917 // The returned TorrentStats may require alignment in memory. See
1918 // https://github.com/anacrolix/torrent/issues/383.
1919 func (t *Torrent) Stats() TorrentStats {
1920         t.cl.rLock()
1921         defer t.cl.rUnlock()
1922         return t.statsLocked()
1923 }
1924
1925 func (t *Torrent) statsLocked() (ret TorrentStats) {
1926         ret.ActivePeers = len(t.conns)
1927         ret.HalfOpenPeers = len(t.halfOpen)
1928         ret.PendingPeers = t.peers.Len()
1929         ret.TotalPeers = t.numTotalPeers()
1930         ret.ConnectedSeeders = 0
1931         for c := range t.conns {
1932                 if all, ok := c.peerHasAllPieces(); all && ok {
1933                         ret.ConnectedSeeders++
1934                 }
1935         }
1936         ret.ConnStats = t.stats.Copy()
1937         ret.PiecesComplete = t.numPiecesCompleted()
1938         return
1939 }
1940
1941 // The total number of peers in the torrent.
1942 func (t *Torrent) numTotalPeers() int {
1943         peers := make(map[string]struct{})
1944         for conn := range t.conns {
1945                 ra := conn.conn.RemoteAddr()
1946                 if ra == nil {
1947                         // It's been closed and doesn't support RemoteAddr.
1948                         continue
1949                 }
1950                 peers[ra.String()] = struct{}{}
1951         }
1952         for addr := range t.halfOpen {
1953                 peers[addr] = struct{}{}
1954         }
1955         t.peers.Each(func(peer PeerInfo) {
1956                 peers[peer.Addr.String()] = struct{}{}
1957         })
1958         return len(peers)
1959 }
1960
1961 // Reconcile bytes transferred before connection was associated with a
1962 // torrent.
1963 func (t *Torrent) reconcileHandshakeStats(c *Peer) {
1964         if c._stats != (ConnStats{
1965                 // Handshakes should only increment these fields:
1966                 BytesWritten: c._stats.BytesWritten,
1967                 BytesRead:    c._stats.BytesRead,
1968         }) {
1969                 panic("bad stats")
1970         }
1971         c.postHandshakeStats(func(cs *ConnStats) {
1972                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1973                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1974         })
1975         c.reconciledHandshakeStats = true
1976 }
1977
1978 // Returns true if the connection is added.
1979 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1980         defer func() {
1981                 if err == nil {
1982                         torrent.Add("added connections", 1)
1983                 }
1984         }()
1985         if t.closed.IsSet() {
1986                 return errors.New("torrent closed")
1987         }
1988         for c0 := range t.conns {
1989                 if c.PeerID != c0.PeerID {
1990                         continue
1991                 }
1992                 if !t.cl.config.DropDuplicatePeerIds {
1993                         continue
1994                 }
1995                 if c.hasPreferredNetworkOver(c0) {
1996                         c0.close()
1997                         t.deletePeerConn(c0)
1998                 } else {
1999                         return errors.New("existing connection preferred")
2000                 }
2001         }
2002         if len(t.conns) >= t.maxEstablishedConns {
2003                 numOutgoing := t.numOutgoingConns()
2004                 numIncoming := len(t.conns) - numOutgoing
2005                 c := t.worstBadConn(worseConnLensOpts{
2006                         // We've already established that we have too many connections at this point, so we just
2007                         // need to match what kind we have too many of vs. what we're trying to add now.
2008                         incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
2009                         outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
2010                 })
2011                 if c == nil {
2012                         return errors.New("don't want conn")
2013                 }
2014                 c.close()
2015                 t.deletePeerConn(c)
2016         }
2017         if len(t.conns) >= t.maxEstablishedConns {
2018                 panic(len(t.conns))
2019         }
2020         t.conns[c] = struct{}{}
2021         t.cl.event.Broadcast()
2022         // We'll never receive the "p" extended handshake parameter.
2023         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
2024                 t.pex.Add(c)
2025         }
2026         return nil
2027 }
2028
2029 func (t *Torrent) newConnsAllowed() bool {
2030         if !t.networkingEnabled.Bool() {
2031                 return false
2032         }
2033         if t.closed.IsSet() {
2034                 return false
2035         }
2036         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2037                 return false
2038         }
2039         return true
2040 }
2041
2042 func (t *Torrent) wantAnyConns() bool {
2043         if !t.networkingEnabled.Bool() {
2044                 return false
2045         }
2046         if t.closed.IsSet() {
2047                 return false
2048         }
2049         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2050                 return false
2051         }
2052         return len(t.conns) < t.maxEstablishedConns
2053 }
2054
2055 func (t *Torrent) wantOutgoingConns() bool {
2056         if !t.newConnsAllowed() {
2057                 return false
2058         }
2059         if len(t.conns) < t.maxEstablishedConns {
2060                 return true
2061         }
2062         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2063         return t.worstBadConn(worseConnLensOpts{
2064                 incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
2065                 outgoingIsBad: false,
2066         }) != nil
2067 }
2068
2069 func (t *Torrent) wantIncomingConns() bool {
2070         if !t.newConnsAllowed() {
2071                 return false
2072         }
2073         if len(t.conns) < t.maxEstablishedConns {
2074                 return true
2075         }
2076         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2077         return t.worstBadConn(worseConnLensOpts{
2078                 incomingIsBad: false,
2079                 outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
2080         }) != nil
2081 }
2082
2083 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
2084         t.cl.lock()
2085         defer t.cl.unlock()
2086         oldMax = t.maxEstablishedConns
2087         t.maxEstablishedConns = max
2088         wcs := worseConnSlice{
2089                 conns: t.appendConns(nil, func(*PeerConn) bool {
2090                         return true
2091                 }),
2092         }
2093         wcs.initKeys(worseConnLensOpts{})
2094         heap.Init(&wcs)
2095         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
2096                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
2097         }
2098         t.openNewConns()
2099         return oldMax
2100 }
2101
2102 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
2103         t.logger.LazyLog(log.Debug, func() log.Msg {
2104                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
2105         })
2106         p := t.piece(piece)
2107         p.numVerifies++
2108         t.cl.event.Broadcast()
2109         if t.closed.IsSet() {
2110                 return
2111         }
2112
2113         // Don't score the first time a piece is hashed, it could be an initial check.
2114         if p.storageCompletionOk {
2115                 if passed {
2116                         pieceHashedCorrect.Add(1)
2117                 } else {
2118                         log.Fmsg(
2119                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
2120                         ).AddValues(t, p).LogLevel(
2121
2122                                 log.Debug, t.logger)
2123
2124                         pieceHashedNotCorrect.Add(1)
2125                 }
2126         }
2127
2128         p.marking = true
2129         t.publishPieceStateChange(piece)
2130         defer func() {
2131                 p.marking = false
2132                 t.publishPieceStateChange(piece)
2133         }()
2134
2135         if passed {
2136                 if len(p.dirtiers) != 0 {
2137                         // Don't increment stats above connection-level for every involved connection.
2138                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
2139                 }
2140                 for c := range p.dirtiers {
2141                         c._stats.incrementPiecesDirtiedGood()
2142                 }
2143                 t.clearPieceTouchers(piece)
2144                 hasDirty := p.hasDirtyChunks()
2145                 t.cl.unlock()
2146                 if hasDirty {
2147                         p.Flush() // You can be synchronous here!
2148                 }
2149                 err := p.Storage().MarkComplete()
2150                 if err != nil {
2151                         t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
2152                 }
2153                 t.cl.lock()
2154
2155                 if t.closed.IsSet() {
2156                         return
2157                 }
2158                 t.pendAllChunkSpecs(piece)
2159         } else {
2160                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2161                         // Peers contributed to all the data for this piece hash failure, and the failure was
2162                         // not due to errors in the storage (such as data being dropped in a cache).
2163
2164                         // Increment Torrent and above stats, and then specific connections.
2165                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2166                         for c := range p.dirtiers {
2167                                 // Y u do dis peer?!
2168                                 c.stats().incrementPiecesDirtiedBad()
2169                         }
2170
2171                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2172                         for c := range p.dirtiers {
2173                                 if !c.trusted {
2174                                         bannableTouchers = append(bannableTouchers, c)
2175                                 }
2176                         }
2177                         t.clearPieceTouchers(piece)
2178                         slices.Sort(bannableTouchers, connLessTrusted)
2179
2180                         if t.cl.config.Debug {
2181                                 t.logger.Printf(
2182                                         "bannable conns by trust for piece %d: %v",
2183                                         piece,
2184                                         func() (ret []connectionTrust) {
2185                                                 for _, c := range bannableTouchers {
2186                                                         ret = append(ret, c.trust())
2187                                                 }
2188                                                 return
2189                                         }(),
2190                                 )
2191                         }
2192
2193                         if len(bannableTouchers) >= 1 {
2194                                 c := bannableTouchers[0]
2195                                 if len(bannableTouchers) != 1 {
2196                                         t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2197                                 } else {
2198                                         // Turns out it's still useful to ban peers like this because if there's only a
2199                                         // single peer for a piece, and we never progress that piece to completion, we
2200                                         // will never smart-ban them. Discovered in
2201                                         // https://github.com/anacrolix/torrent/issues/715.
2202                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2203                                         c.ban()
2204                                 }
2205                         }
2206                 }
2207                 t.onIncompletePiece(piece)
2208                 p.Storage().MarkNotComplete()
2209         }
2210         t.updatePieceCompletion(piece)
2211 }
2212
2213 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2214         start := t.pieceRequestIndexOffset(piece)
2215         end := start + t.pieceNumChunks(piece)
2216         for ri := start; ri < end; ri++ {
2217                 t.cancelRequest(ri)
2218         }
2219 }
2220
2221 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2222         t.pendAllChunkSpecs(piece)
2223         t.cancelRequestsForPiece(piece)
2224         t.piece(piece).readerCond.Broadcast()
2225         for conn := range t.conns {
2226                 conn.have(piece)
2227                 t.maybeDropMutuallyCompletePeer(conn)
2228         }
2229 }
2230
2231 // Called when a piece is found to be not complete.
2232 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2233         if t.pieceAllDirty(piece) {
2234                 t.pendAllChunkSpecs(piece)
2235         }
2236         if !t.wantPieceIndex(piece) {
2237                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2238                 return
2239         }
2240         // We could drop any connections that we told we have a piece that we
2241         // don't here. But there's a test failure, and it seems clients don't care
2242         // if you request pieces that you already claim to have. Pruning bad
2243         // connections might just remove any connections that aren't treating us
2244         // favourably anyway.
2245
2246         // for c := range t.conns {
2247         //      if c.sentHave(piece) {
2248         //              c.drop()
2249         //      }
2250         // }
2251         t.iterPeers(func(conn *Peer) {
2252                 if conn.peerHasPiece(piece) {
2253                         conn.updateRequests("piece incomplete")
2254                 }
2255         })
2256 }
2257
2258 func (t *Torrent) tryCreateMorePieceHashers() {
2259         for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
2260         }
2261 }
2262
2263 func (t *Torrent) tryCreatePieceHasher() bool {
2264         if t.storage == nil {
2265                 return false
2266         }
2267         pi, ok := t.getPieceToHash()
2268         if !ok {
2269                 return false
2270         }
2271         p := t.piece(pi)
2272         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2273         p.hashing = true
2274         t.publishPieceStateChange(pi)
2275         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2276         t.storageLock.RLock()
2277         t.activePieceHashes++
2278         go t.pieceHasher(pi)
2279         return true
2280 }
2281
2282 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2283         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2284                 if t.piece(i).hashing {
2285                         return true
2286                 }
2287                 ret = i
2288                 ok = true
2289                 return false
2290         })
2291         return
2292 }
2293
2294 func (t *Torrent) dropBannedPeers() {
2295         t.iterPeers(func(p *Peer) {
2296                 remoteIp := p.remoteIp()
2297                 if remoteIp == nil {
2298                         if p.bannableAddr.Ok {
2299                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2300                         }
2301                         return
2302                 }
2303                 netipAddr := netip.MustParseAddr(remoteIp.String())
2304                 if Some(netipAddr) != p.bannableAddr {
2305                         t.logger.WithDefaultLevel(log.Debug).Printf(
2306                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2307                                 p, remoteIp, p.bannableAddr)
2308                 }
2309                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2310                         // Should this be a close?
2311                         p.drop()
2312                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2313                 }
2314         })
2315 }
2316
2317 func (t *Torrent) pieceHasher(index pieceIndex) {
2318         p := t.piece(index)
2319         sum, failedPeers, copyErr := t.hashPiece(index)
2320         correct := sum == *p.hash
2321         switch copyErr {
2322         case nil, io.EOF:
2323         default:
2324                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2325         }
2326         t.storageLock.RUnlock()
2327         t.cl.lock()
2328         defer t.cl.unlock()
2329         if correct {
2330                 for peer := range failedPeers {
2331                         t.cl.banPeerIP(peer.AsSlice())
2332                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2333                 }
2334                 t.dropBannedPeers()
2335                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2336                         t.smartBanCache.ForgetBlock(ri)
2337                 }
2338         }
2339         p.hashing = false
2340         t.pieceHashed(index, correct, copyErr)
2341         t.updatePiecePriority(index, "Torrent.pieceHasher")
2342         t.activePieceHashes--
2343         t.tryCreateMorePieceHashers()
2344 }
2345
2346 // Return the connections that touched a piece, and clear the entries while doing it.
2347 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2348         p := t.piece(pi)
2349         for c := range p.dirtiers {
2350                 delete(c.peerTouchedPieces, pi)
2351                 delete(p.dirtiers, c)
2352         }
2353 }
2354
2355 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2356         t.iterPeers(func(p *Peer) {
2357                 ret = append(ret, p)
2358         })
2359         return
2360 }
2361
2362 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2363         piece := t.piece(pieceIndex)
2364         if piece.queuedForHash() {
2365                 return
2366         }
2367         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2368         t.publishPieceStateChange(pieceIndex)
2369         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2370         t.tryCreateMorePieceHashers()
2371 }
2372
2373 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2374 // before the Info is available.
2375 func (t *Torrent) VerifyData() {
2376         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2377                 t.Piece(i).VerifyData()
2378         }
2379 }
2380
2381 func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
2382         return len(t.halfOpen[addrStr]) != 0
2383 }
2384
2385 func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
2386         addrStr := x.String()
2387         for c := range t.conns {
2388                 ra := c.RemoteAddr
2389                 if ra.String() == addrStr {
2390                         return true
2391                 }
2392         }
2393         return false
2394 }
2395
2396 func (t *Torrent) getHalfOpenPath(
2397         addrStr string,
2398         attemptKey outgoingConnAttemptKey,
2399 ) nestedmaps.Path[*PeerInfo] {
2400         return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey)
2401 }
2402
2403 func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
2404         path := t.getHalfOpenPath(addrStr, attemptKey)
2405         if path.Exists() {
2406                 panic("should be unique")
2407         }
2408         path.Set(attemptKey)
2409         t.cl.numHalfOpen++
2410 }
2411
2412 // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
2413 // sure all the PeerInfo fields are being used.
2414 func initiateConn(
2415         opts outgoingConnOpts,
2416         ignoreLimits bool,
2417 ) {
2418         t := opts.t
2419         peer := opts.peerInfo
2420         if peer.Id == t.cl.peerID {
2421                 return
2422         }
2423         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2424                 return
2425         }
2426         addr := peer.Addr
2427         addrStr := addr.String()
2428         if !ignoreLimits {
2429                 if t.connectingToPeerAddr(addrStr) {
2430                         return
2431                 }
2432         }
2433         if t.hasPeerConnForAddr(addr) {
2434                 return
2435         }
2436         attemptKey := &peer
2437         t.addHalfOpen(addrStr, attemptKey)
2438         go t.cl.outgoingConnection(
2439                 opts,
2440                 attemptKey,
2441         )
2442 }
2443
2444 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2445 // quickly make one Client visible to the Torrent of another Client.
2446 func (t *Torrent) AddClientPeer(cl *Client) int {
2447         return t.AddPeers(func() (ps []PeerInfo) {
2448                 for _, la := range cl.ListenAddrs() {
2449                         ps = append(ps, PeerInfo{
2450                                 Addr:    la,
2451                                 Trusted: true,
2452                         })
2453                 }
2454                 return
2455         }())
2456 }
2457
2458 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2459 // connection.
2460 func (t *Torrent) allStats(f func(*ConnStats)) {
2461         f(&t.stats)
2462         f(&t.cl.connStats)
2463 }
2464
2465 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2466         return t.pieces[i].hashing
2467 }
2468
2469 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2470         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2471 }
2472
2473 func (t *Torrent) dialTimeout() time.Duration {
2474         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2475 }
2476
2477 func (t *Torrent) piece(i int) *Piece {
2478         return &t.pieces[i]
2479 }
2480
2481 func (t *Torrent) onWriteChunkErr(err error) {
2482         if t.userOnWriteChunkErr != nil {
2483                 go t.userOnWriteChunkErr(err)
2484                 return
2485         }
2486         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2487         t.disallowDataDownloadLocked()
2488 }
2489
2490 func (t *Torrent) DisallowDataDownload() {
2491         t.cl.lock()
2492         defer t.cl.unlock()
2493         t.disallowDataDownloadLocked()
2494 }
2495
2496 func (t *Torrent) disallowDataDownloadLocked() {
2497         t.dataDownloadDisallowed.Set()
2498         t.iterPeers(func(p *Peer) {
2499                 // Could check if peer request state is empty/not interested?
2500                 p.updateRequests("disallow data download")
2501                 p.cancelAllRequests()
2502         })
2503 }
2504
2505 func (t *Torrent) AllowDataDownload() {
2506         t.cl.lock()
2507         defer t.cl.unlock()
2508         t.dataDownloadDisallowed.Clear()
2509         t.iterPeers(func(p *Peer) {
2510                 p.updateRequests("allow data download")
2511         })
2512 }
2513
2514 // Enables uploading data, if it was disabled.
2515 func (t *Torrent) AllowDataUpload() {
2516         t.cl.lock()
2517         defer t.cl.unlock()
2518         t.dataUploadDisallowed = false
2519         t.iterPeers(func(p *Peer) {
2520                 p.updateRequests("allow data upload")
2521         })
2522 }
2523
2524 // Disables uploading data, if it was enabled.
2525 func (t *Torrent) DisallowDataUpload() {
2526         t.cl.lock()
2527         defer t.cl.unlock()
2528         t.dataUploadDisallowed = true
2529         for c := range t.conns {
2530                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2531                 c.updateRequests("disallow data upload")
2532         }
2533 }
2534
2535 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2536 // or if nil, a critical message is logged, and data download is disabled.
2537 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2538         t.cl.lock()
2539         defer t.cl.unlock()
2540         t.userOnWriteChunkErr = f
2541 }
2542
2543 func (t *Torrent) iterPeers(f func(p *Peer)) {
2544         for pc := range t.conns {
2545                 f(&pc.Peer)
2546         }
2547         for _, ws := range t.webSeeds {
2548                 f(ws)
2549         }
2550 }
2551
2552 func (t *Torrent) callbacks() *Callbacks {
2553         return &t.cl.config.Callbacks
2554 }
2555
2556 type AddWebSeedsOpt func(*webseed.Client)
2557
2558 // Sets the WebSeed trailing path escaper for a webseed.Client.
2559 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2560         return func(c *webseed.Client) {
2561                 c.PathEscaper = custom
2562         }
2563 }
2564
2565 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2566         t.cl.lock()
2567         defer t.cl.unlock()
2568         for _, u := range urls {
2569                 t.addWebSeed(u, opts...)
2570         }
2571 }
2572
2573 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2574         if t.cl.config.DisableWebseeds {
2575                 return
2576         }
2577         if _, ok := t.webSeeds[url]; ok {
2578                 return
2579         }
2580         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2581         // right away. This value should be some multiple of the number of connections to a host. I
2582         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2583         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2584         // "https://webtorrent.io/torrents/".
2585         const maxRequests = 16
2586         ws := webseedPeer{
2587                 peer: Peer{
2588                         t:                        t,
2589                         outgoing:                 true,
2590                         Network:                  "http",
2591                         reconciledHandshakeStats: true,
2592                         // This should affect how often we have to recompute requests for this peer. Note that
2593                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2594                         // requests mark more often, so recomputation is probably sooner than with regular peer
2595                         // conns. ~4x maxRequests would be about right.
2596                         PeerMaxRequests: 128,
2597                         // TODO: Set ban prefix?
2598                         RemoteAddr: remoteAddrFromUrl(url),
2599                         callbacks:  t.callbacks(),
2600                 },
2601                 client: webseed.Client{
2602                         HttpClient: t.cl.httpClient,
2603                         Url:        url,
2604                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2605                                 return &rateLimitedReader{
2606                                         l: t.cl.config.DownloadRateLimiter,
2607                                         r: r,
2608                                 }
2609                         },
2610                 },
2611                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2612         }
2613         ws.peer.initRequestState()
2614         for _, opt := range opts {
2615                 opt(&ws.client)
2616         }
2617         ws.peer.initUpdateRequestsTimer()
2618         ws.requesterCond.L = t.cl.locker()
2619         for i := 0; i < maxRequests; i += 1 {
2620                 go ws.requester(i)
2621         }
2622         for _, f := range t.callbacks().NewPeer {
2623                 f(&ws.peer)
2624         }
2625         ws.peer.logger = t.logger.WithContextValue(&ws)
2626         ws.peer.peerImpl = &ws
2627         if t.haveInfo() {
2628                 ws.onGotInfo(t.info)
2629         }
2630         t.webSeeds[url] = &ws.peer
2631 }
2632
2633 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2634         t.iterPeers(func(p1 *Peer) {
2635                 if p1 == p {
2636                         active = true
2637                 }
2638         })
2639         return
2640 }
2641
2642 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2643         index := t.pieceIndexOfRequestIndex(ri)
2644         return Request{
2645                 pp.Integer(index),
2646                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2647         }
2648 }
2649
2650 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2651         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2652 }
2653
2654 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2655         return RequestIndex(piece) * t.chunksPerRegularPiece()
2656 }
2657
2658 func (t *Torrent) updateComplete() {
2659         t.Complete.SetBool(t.haveAllPieces())
2660 }
2661
2662 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2663         p := t.requestingPeer(r)
2664         if p != nil {
2665                 p.cancel(r)
2666         }
2667         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2668         //delete(t.pendingRequests, r)
2669         if _, ok := t.requestState[r]; ok {
2670                 panic("expected request state to be gone")
2671         }
2672         return p
2673 }
2674
2675 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2676         return t.requestState[r].peer
2677 }
2678
2679 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2680         if t.connsWithAllPieces == nil {
2681                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2682         }
2683         t.connsWithAllPieces[p] = struct{}{}
2684 }
2685
2686 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2687         _, ok := t.connsWithAllPieces[p]
2688         delete(t.connsWithAllPieces, p)
2689         return ok
2690 }
2691
2692 func (t *Torrent) numActivePeers() int {
2693         return len(t.conns) + len(t.webSeeds)
2694 }
2695
2696 func (t *Torrent) hasStorageCap() bool {
2697         f := t.storage.Capacity
2698         if f == nil {
2699                 return false
2700         }
2701         _, ok := (*f)()
2702         return ok
2703 }
2704
2705 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2706         return pieceIndex(ri / t.chunksPerRegularPiece())
2707 }
2708
2709 func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
2710         reuseIter *typedRoaring.Iterator[RequestIndex],
2711         piece pieceIndex,
2712         f func(RequestIndex),
2713 ) {
2714         reuseIter.Initialize(&t.dirtyChunks)
2715         pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
2716         iterBitmapUnsetInRange(
2717                 reuseIter,
2718                 pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
2719                 f,
2720         )
2721 }
2722
2723 type requestState struct {
2724         peer *Peer
2725         when time.Time
2726 }
2727
2728 // Returns an error if a received chunk is out of bounds in someway.
2729 func (t *Torrent) checkValidReceiveChunk(r Request) error {
2730         if !t.haveInfo() {
2731                 return errors.New("torrent missing info")
2732         }
2733         if int(r.Index) >= t.numPieces() {
2734                 return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
2735         }
2736         pieceLength := t.pieceLength(pieceIndex(r.Index))
2737         if r.Begin >= pieceLength {
2738                 return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
2739         }
2740         // We could check chunk lengths here, but chunk request size is not changed often, and tricky
2741         // for peers to manipulate as they need to send potentially large buffers to begin with. There
2742         // should be considerable checks elsewhere for this case due to the network overhead. We should
2743         // catch most of the overflow manipulation stuff by checking index and begin above.
2744         return nil
2745 }
2746
2747 func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
2748         for pc := range t.conns {
2749                 dialAddr, err := pc.remoteDialAddrPort()
2750                 if err != nil {
2751                         continue
2752                 }
2753                 if dialAddr != target {
2754                         continue
2755                 }
2756                 ret = append(ret, pc)
2757         }
2758         return
2759 }
2760
2761 func wrapUtHolepunchMsgForPeerConn(
2762         recipient *PeerConn,
2763         msg utHolepunch.Msg,
2764 ) pp.Message {
2765         extendedPayload, err := msg.MarshalBinary()
2766         if err != nil {
2767                 panic(err)
2768         }
2769         return pp.Message{
2770                 Type:            pp.Extended,
2771                 ExtendedID:      MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
2772                 ExtendedPayload: extendedPayload,
2773         }
2774 }
2775
2776 func sendUtHolepunchMsg(
2777         pc *PeerConn,
2778         msgType utHolepunch.MsgType,
2779         addrPort netip.AddrPort,
2780         errCode utHolepunch.ErrCode,
2781 ) {
2782         holepunchMsg := utHolepunch.Msg{
2783                 MsgType:  msgType,
2784                 AddrPort: addrPort,
2785                 ErrCode:  errCode,
2786         }
2787         incHolepunchMessagesSent(holepunchMsg)
2788         ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg)
2789         pc.write(ppMsg)
2790 }
2791
2792 func incHolepunchMessages(msg utHolepunch.Msg, verb string) {
2793         torrent.Add(
2794                 fmt.Sprintf(
2795                         "holepunch %v %v messages %v",
2796                         msg.MsgType,
2797                         addrPortProtocolStr(msg.AddrPort),
2798                         verb,
2799                 ),
2800                 1,
2801         )
2802 }
2803
2804 func incHolepunchMessagesReceived(msg utHolepunch.Msg) {
2805         incHolepunchMessages(msg, "received")
2806 }
2807
2808 func incHolepunchMessagesSent(msg utHolepunch.Msg) {
2809         incHolepunchMessages(msg, "sent")
2810 }
2811
2812 func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
2813         incHolepunchMessagesReceived(msg)
2814         switch msg.MsgType {
2815         case utHolepunch.Rendezvous:
2816                 t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
2817                 sendMsg := sendUtHolepunchMsg
2818                 senderAddrPort, err := sender.remoteDialAddrPort()
2819                 if err != nil {
2820                         sender.logger.Levelf(
2821                                 log.Warning,
2822                                 "error getting ut_holepunch rendezvous sender's dial address: %v",
2823                                 err,
2824                         )
2825                         // There's no better error code. The sender's address itself is invalid. I don't see
2826                         // this error message being appropriate anywhere else anyway.
2827                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
2828                 }
2829                 targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
2830                 if len(targets) == 0 {
2831                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
2832                         return nil
2833                 }
2834                 for _, pc := range targets {
2835                         if !pc.supportsExtension(utHolepunch.ExtensionName) {
2836                                 sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
2837                                 continue
2838                         }
2839                         sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
2840                         sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
2841                 }
2842                 return nil
2843         case utHolepunch.Connect:
2844                 holepunchAddr := msg.AddrPort
2845                 t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender)
2846                 if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) {
2847                         setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr)
2848                         if g.MapContains(t.cl.accepted, holepunchAddr) {
2849                                 setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
2850                         }
2851                 }
2852                 opts := outgoingConnOpts{
2853                         peerInfo: PeerInfo{
2854                                 Addr:         msg.AddrPort,
2855                                 Source:       PeerSourceUtHolepunch,
2856                                 PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
2857                         },
2858                         t: t,
2859                         // Don't attempt to start our own rendezvous if we fail to connect.
2860                         skipHolepunchRendezvous:  true,
2861                         receivedHolepunchConnect: true,
2862                         // Assume that the other end initiated the rendezvous, and will use our preferred
2863                         // encryption. So we will act normally.
2864                         HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
2865                 }
2866                 initiateConn(opts, true)
2867                 return nil
2868         case utHolepunch.Error:
2869                 torrent.Add("holepunch error messages received", 1)
2870                 t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
2871                 return nil
2872         default:
2873                 return fmt.Errorf("unhandled msg type %v", msg.MsgType)
2874         }
2875 }
2876
2877 func addrPortProtocolStr(addrPort netip.AddrPort) string {
2878         addr := addrPort.Addr()
2879         switch {
2880         case addr.Is4():
2881                 return "ipv4"
2882         case addr.Is6():
2883                 return "ipv6"
2884         default:
2885                 panic(addrPort)
2886         }
2887 }
2888
2889 func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
2890         rzsSent := 0
2891         for pc := range t.conns {
2892                 if !pc.supportsExtension(utHolepunch.ExtensionName) {
2893                         continue
2894                 }
2895                 if pc.supportsExtension(pp.ExtensionNamePex) {
2896                         if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
2897                                 continue
2898                         }
2899                 }
2900                 t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
2901                 sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
2902                 rzsSent++
2903         }
2904         if rzsSent == 0 {
2905                 return errors.New("no eligible relays")
2906         }
2907         return nil
2908 }
2909
2910 func (t *Torrent) numHalfOpenAttempts() (num int) {
2911         for _, attempts := range t.halfOpen {
2912                 num += len(attempts)
2913         }
2914         return
2915 }
2916
2917 func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
2918         cl := t.cl
2919         cl.rLock()
2920         defer cl.rUnlock()
2921         return t.dialTimeout()
2922 }