]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Note about speeding possum storage up
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "hash"
11         "io"
12         "math/rand"
13         "net/netip"
14         "net/url"
15         "sort"
16         "strings"
17         "text/tabwriter"
18         "time"
19         "unsafe"
20
21         "github.com/RoaringBitmap/roaring"
22         "github.com/anacrolix/chansync"
23         "github.com/anacrolix/chansync/events"
24         "github.com/anacrolix/dht/v2"
25         . "github.com/anacrolix/generics"
26         g "github.com/anacrolix/generics"
27         "github.com/anacrolix/log"
28         "github.com/anacrolix/missinggo/slices"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pubsub"
32         "github.com/anacrolix/multiless"
33         "github.com/anacrolix/sync"
34         "github.com/pion/datachannel"
35         "golang.org/x/exp/maps"
36         "golang.org/x/sync/errgroup"
37
38         "github.com/anacrolix/torrent/bencode"
39         "github.com/anacrolix/torrent/internal/check"
40         "github.com/anacrolix/torrent/internal/nestedmaps"
41         "github.com/anacrolix/torrent/merkle"
42         "github.com/anacrolix/torrent/metainfo"
43         pp "github.com/anacrolix/torrent/peer_protocol"
44         utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
45         request_strategy "github.com/anacrolix/torrent/request-strategy"
46         "github.com/anacrolix/torrent/storage"
47         "github.com/anacrolix/torrent/tracker"
48         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
49         "github.com/anacrolix/torrent/types/infohash"
50         infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
51         "github.com/anacrolix/torrent/webseed"
52         "github.com/anacrolix/torrent/webtorrent"
53 )
54
55 // Maintains state of torrent within a Client. Many methods should not be called before the info is
56 // available, see .Info and .GotInfo.
57 type Torrent struct {
58         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
59         // alignment. See #262.
60         stats  ConnStats
61         cl     *Client
62         logger log.Logger
63
64         networkingEnabled      chansync.Flag
65         dataDownloadDisallowed chansync.Flag
66         dataUploadDisallowed   bool
67         userOnWriteChunkErr    func(error)
68
69         closed  chansync.SetOnce
70         onClose []func()
71
72         infoHash   g.Option[metainfo.Hash]
73         infoHashV2 g.Option[infohash_v2.T]
74
75         pieces []Piece
76
77         // The order pieces are requested if there's no stronger reason like availability or priority.
78         pieceRequestOrder []int
79         // Values are the piece indices that changed.
80         pieceStateChanges pubsub.PubSub[PieceStateChange]
81         // The size of chunks to request from peers over the wire. This is
82         // normally 16KiB by convention these days.
83         chunkSize pp.Integer
84         chunkPool sync.Pool
85         // Total length of the torrent in bytes. Stored because it's not O(1) to
86         // get this from the info dict.
87         _length Option[int64]
88
89         // The storage to open when the info dict becomes available.
90         storageOpener *storage.Client
91         // Storage for torrent data.
92         storage *storage.Torrent
93         // Read-locked for using storage, and write-locked for Closing.
94         storageLock sync.RWMutex
95
96         // TODO: Only announce stuff is used?
97         metainfo metainfo.MetaInfo
98
99         // The info dict. nil if we don't have it (yet).
100         info  *metainfo.Info
101         files *[]*File
102
103         _chunksPerRegularPiece chunkIndexType
104
105         webSeeds map[string]*Peer
106         // Active peer connections, running message stream loops. TODO: Make this
107         // open (not-closed) connections only.
108         conns               map[*PeerConn]struct{}
109         maxEstablishedConns int
110         // Set of addrs to which we're attempting to connect. Connections are
111         // half-open until all handshakes are completed.
112         halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
113
114         // Reserve of peers to connect to. A peer can be both here and in the
115         // active connections if were told about the peer after connecting with
116         // them. That encourages us to reconnect to peers that are well known in
117         // the swarm.
118         peers prioritizedPeers
119         // Whether we want to know more peers.
120         wantPeersEvent missinggo.Event
121         // An announcer for each tracker URL.
122         trackerAnnouncers map[torrentTrackerAnnouncerKey]torrentTrackerAnnouncer
123         // How many times we've initiated a DHT announce. TODO: Move into stats.
124         numDHTAnnounces int
125
126         // Name used if the info name isn't available. Should be cleared when the
127         // Info does become available.
128         nameMu      sync.RWMutex
129         displayName string
130
131         // The bencoded bytes of the info dict. This is actively manipulated if
132         // the info bytes aren't initially available, and we try to fetch them
133         // from peers.
134         metadataBytes []byte
135         // Each element corresponds to the 16KiB metadata pieces. If true, we have
136         // received that piece.
137         metadataCompletedChunks []bool
138         metadataChanged         sync.Cond
139
140         // Closed when .Info is obtained.
141         gotMetainfoC chan struct{}
142
143         readers                map[*reader]struct{}
144         _readerNowPieces       bitmap.Bitmap
145         _readerReadaheadPieces bitmap.Bitmap
146
147         // A cache of pieces we need to get. Calculated from various piece and
148         // file priorities and completion states elsewhere.
149         _pendingPieces roaring.Bitmap
150         // A cache of completed piece indices.
151         _completedPieces roaring.Bitmap
152         // Pieces that need to be hashed.
153         piecesQueuedForHash       bitmap.Bitmap
154         activePieceHashes         int
155         initialPieceCheckDisabled bool
156
157         connsWithAllPieces map[*Peer]struct{}
158
159         requestState map[RequestIndex]requestState
160         // Chunks we've written to since the corresponding piece was last checked.
161         dirtyChunks typedRoaring.Bitmap[RequestIndex]
162
163         pex pexState
164
165         // Is On when all pieces are complete.
166         Complete chansync.Flag
167
168         // Torrent sources in use keyed by the source string.
169         activeSources sync.Map
170         sourcesLogger log.Logger
171
172         smartBanCache smartBanCache
173
174         // Large allocations reused between request state updates.
175         requestPieceStates []request_strategy.PieceRequestOrderState
176         requestIndexes     []RequestIndex
177
178         disableTriggers bool
179 }
180
181 type torrentTrackerAnnouncerKey struct {
182         shortInfohash [20]byte
183         url           string
184 }
185
186 type outgoingConnAttemptKey = *PeerInfo
187
188 func (t *Torrent) length() int64 {
189         return t._length.Value
190 }
191
192 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
193         // This could be done with roaring.BitSliceIndexing.
194         t.iterPeers(func(peer *Peer) {
195                 if _, ok := t.connsWithAllPieces[peer]; ok {
196                         return
197                 }
198                 if peer.peerHasPiece(i) {
199                         count++
200                 }
201         })
202         return
203 }
204
205 func (t *Torrent) decPieceAvailability(i pieceIndex) {
206         if !t.haveInfo() {
207                 return
208         }
209         p := t.piece(i)
210         if p.relativeAvailability <= 0 {
211                 panic(p.relativeAvailability)
212         }
213         p.relativeAvailability--
214         t.updatePieceRequestOrderPiece(i)
215 }
216
217 func (t *Torrent) incPieceAvailability(i pieceIndex) {
218         // If we don't the info, this should be reconciled when we do.
219         if t.haveInfo() {
220                 p := t.piece(i)
221                 p.relativeAvailability++
222                 t.updatePieceRequestOrderPiece(i)
223         }
224 }
225
226 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
227         return t._readerNowPieces
228 }
229
230 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
231         return t._readerReadaheadPieces
232 }
233
234 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
235         return !t.wantPieceIndex(i)
236 }
237
238 // Returns a channel that is closed when the Torrent is closed.
239 func (t *Torrent) Closed() events.Done {
240         return t.closed.Done()
241 }
242
243 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
244 // pending, and half-open peers.
245 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
246         // Add pending peers to the list
247         t.peers.Each(func(peer PeerInfo) {
248                 ks = append(ks, peer)
249         })
250
251         // Add half-open peers to the list
252         for _, attempts := range t.halfOpen {
253                 for _, peer := range attempts {
254                         ks = append(ks, *peer)
255                 }
256         }
257
258         // Add active peers to the list
259         t.cl.rLock()
260         defer t.cl.rUnlock()
261         for conn := range t.conns {
262                 ks = append(ks, PeerInfo{
263                         Id:     conn.PeerID,
264                         Addr:   conn.RemoteAddr,
265                         Source: conn.Discovery,
266                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
267                         // > But if we're not connected to them with an encrypted connection, I couldn't say
268                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
269                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
270                         // > connection. It's probably easiest to do the latter for now.
271                         // https://github.com/anacrolix/torrent/pull/188
272                         SupportsEncryption: conn.headerEncrypted,
273                 })
274         }
275
276         return
277 }
278
279 func (t *Torrent) setChunkSize(size pp.Integer) {
280         t.chunkSize = size
281         t.chunkPool = sync.Pool{
282                 New: func() interface{} {
283                         b := make([]byte, size)
284                         return &b
285                 },
286         }
287 }
288
289 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
290         return t._completedPieces.Contains(bitmap.BitIndex(piece))
291 }
292
293 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
294         if t.storage == nil {
295                 return storage.Completion{Complete: false, Ok: true}
296         }
297         return t.pieces[piece].Storage().Completion()
298 }
299
300 // There's a connection to that address already.
301 func (t *Torrent) addrActive(addr string) bool {
302         if _, ok := t.halfOpen[addr]; ok {
303                 return true
304         }
305         for c := range t.conns {
306                 ra := c.RemoteAddr
307                 if ra.String() == addr {
308                         return true
309                 }
310         }
311         return false
312 }
313
314 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
315         return t.appendConns(ret, func(conn *PeerConn) bool {
316                 return !conn.closed.IsSet()
317         })
318 }
319
320 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
321         for c := range t.conns {
322                 if f(c) {
323                         ret = append(ret, c)
324                 }
325         }
326         return ret
327 }
328
329 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
330         cl := t.cl
331         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
332         if t.closed.IsSet() {
333                 return false
334         }
335         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
336                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
337                         torrent.Add("peers not added because of bad addr", 1)
338                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
339                         return false
340                 }
341         }
342         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
343                 torrent.Add("peers replaced", 1)
344                 if !replaced.equal(p) {
345                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
346                         added = true
347                 }
348         } else {
349                 added = true
350         }
351         t.openNewConns()
352         for t.peers.Len() > cl.config.TorrentPeersHighWater {
353                 _, ok := t.peers.DeleteMin()
354                 if ok {
355                         torrent.Add("excess reserve peers discarded", 1)
356                 }
357         }
358         return
359 }
360
361 func (t *Torrent) invalidateMetadata() {
362         for i := 0; i < len(t.metadataCompletedChunks); i++ {
363                 t.metadataCompletedChunks[i] = false
364         }
365         t.nameMu.Lock()
366         t.gotMetainfoC = make(chan struct{})
367         t.info = nil
368         t.nameMu.Unlock()
369 }
370
371 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
372         if t.haveInfo() {
373                 return
374         }
375         if index >= len(t.metadataCompletedChunks) {
376                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
377                 return
378         }
379         copy(t.metadataBytes[(1<<14)*index:], data)
380         t.metadataCompletedChunks[index] = true
381 }
382
383 func (t *Torrent) metadataPieceCount() int {
384         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
385 }
386
387 func (t *Torrent) haveMetadataPiece(piece int) bool {
388         if t.haveInfo() {
389                 return (1<<14)*piece < len(t.metadataBytes)
390         } else {
391                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
392         }
393 }
394
395 func (t *Torrent) metadataSize() int {
396         return len(t.metadataBytes)
397 }
398
399 func (t *Torrent) makePieces() {
400         t.pieces = make([]Piece, t.info.NumPieces())
401         for i := range t.pieces {
402                 piece := &t.pieces[i]
403                 piece.t = t
404                 piece.index = i
405                 piece.noPendingWrites.L = &piece.pendingWritesMutex
406                 if t.info.HasV1() {
407                         piece.hash = (*metainfo.Hash)(unsafe.Pointer(
408                                 unsafe.SliceData(t.info.Pieces[i*sha1.Size : (i+1)*sha1.Size])))
409                 }
410                 files := *t.files
411                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
412                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
413                 piece.files = files[beginFile:endFile]
414                 if t.info.FilesArePieceAligned() {
415                         numFiles := len(piece.files)
416                         if numFiles != 1 {
417                                 panic(fmt.Sprintf("%v:%v", beginFile, endFile))
418                         }
419                         if t.info.HasV2() {
420                                 file := piece.mustGetOnlyFile()
421                                 if file.numPieces() == 1 {
422                                         piece.hashV2.Set(file.piecesRoot.Unwrap())
423                                 }
424                         }
425                 }
426         }
427 }
428
429 func (t *Torrent) AddPieceLayers(layers map[string]string) (err error) {
430         if layers == nil {
431                 return
432         }
433         for _, f := range *t.files {
434                 if !f.piecesRoot.Ok {
435                         err = fmt.Errorf("no piece root set for file %v", f)
436                         return
437                 }
438                 compactLayer, ok := layers[string(f.piecesRoot.Value[:])]
439                 var hashes [][32]byte
440                 if ok {
441                         hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
442                         if err != nil {
443                                 err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
444                                 return
445                         }
446                 } else if f.length > t.info.PieceLength {
447                         // BEP 52 is pretty strongly worded about this, even though we should be able to
448                         // recover: If a v2 torrent is added by magnet link or infohash, we need to fetch piece
449                         // layers ourselves anyway, and that's how we can recover from this.
450                         t.logger.Levelf(log.Warning, "no piece layers for file %q", f)
451                         continue
452                 } else {
453                         hashes = [][32]byte{f.piecesRoot.Value}
454                 }
455                 if len(hashes) != f.numPieces() {
456                         err = fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces())
457                         return
458                 }
459                 for i := range f.numPieces() {
460                         pi := f.BeginPieceIndex() + i
461                         p := t.piece(pi)
462                         // See Torrent.onSetInfo. We want to trigger an initial check if appropriate, if we
463                         // didn't yet have a piece hash (can occur with v2 when we don't start with piece
464                         // layers).
465                         if !p.hashV2.Set(hashes[i]).Ok && p.hash == nil {
466                                 t.queueInitialPieceCheck(pi)
467                         }
468                 }
469         }
470         return nil
471 }
472
473 // Returns the index of the first file containing the piece. files must be
474 // ordered by offset.
475 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
476         for i, f := range files {
477                 if f.offset+f.length > pieceOffset {
478                         return i
479                 }
480         }
481         return 0
482 }
483
484 // Returns the index after the last file containing the piece. files must be
485 // ordered by offset.
486 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
487         for i, f := range files {
488                 if f.offset >= pieceEndOffset {
489                         return i
490                 }
491         }
492         return len(files)
493 }
494
495 func (t *Torrent) cacheLength() {
496         var l int64
497         for _, f := range t.info.UpvertedFiles() {
498                 l += f.Length
499         }
500         t._length = Some(l)
501 }
502
503 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
504 // separately.
505 func (t *Torrent) setInfo(info *metainfo.Info) error {
506         if err := validateInfo(info); err != nil {
507                 return fmt.Errorf("bad info: %s", err)
508         }
509         if t.storageOpener != nil {
510                 var err error
511                 t.storage, err = t.storageOpener.OpenTorrent(info, *t.canonicalShortInfohash())
512                 if err != nil {
513                         return fmt.Errorf("error opening torrent storage: %s", err)
514                 }
515         }
516         t.nameMu.Lock()
517         t.info = info
518         t.nameMu.Unlock()
519         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
520         t.updateComplete()
521         t.displayName = "" // Save a few bytes lol.
522         t.initFiles()
523         t.cacheLength()
524         t.makePieces()
525         return nil
526 }
527
528 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
529         return request_strategy.PieceRequestOrderKey{
530                 InfoHash: *t.canonicalShortInfohash(),
531                 Index:    i,
532         }
533 }
534
535 // This seems to be all the follow-up tasks after info is set, that can't fail.
536 func (t *Torrent) onSetInfo() {
537         t.pieceRequestOrder = rand.Perm(t.numPieces())
538         t.initPieceRequestOrder()
539         MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
540         for i := range t.pieces {
541                 p := &t.pieces[i]
542                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
543                 // being dropped.
544                 if p.relativeAvailability != 0 {
545                         panic(p.relativeAvailability)
546                 }
547                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
548                 t.addRequestOrderPiece(i)
549                 t.updatePieceCompletion(i)
550                 t.queueInitialPieceCheck(i)
551         }
552         t.cl.event.Broadcast()
553         close(t.gotMetainfoC)
554         t.updateWantPeersEvent()
555         t.requestState = make(map[RequestIndex]requestState)
556         t.tryCreateMorePieceHashers()
557         t.iterPeers(func(p *Peer) {
558                 p.onGotInfo(t.info)
559                 p.updateRequests("onSetInfo")
560         })
561 }
562
563 // Checks the info bytes hash to expected values. Fills in any missing infohashes.
564 func (t *Torrent) hashInfoBytes(b []byte, info *metainfo.Info) error {
565         v1Hash := infohash.HashBytes(b)
566         v2Hash := infohash_v2.HashBytes(b)
567         cl := t.cl
568         if t.infoHash.Ok && !t.infoHashV2.Ok {
569                 if v1Hash == t.infoHash.Value {
570                         if info.HasV2() {
571                                 t.infoHashV2.Set(v2Hash)
572                                 cl.torrentsByShortHash[*v2Hash.ToShort()] = t
573                         }
574                 } else if *v2Hash.ToShort() == t.infoHash.Value {
575                         if !info.HasV2() {
576                                 return errors.New("invalid v2 info")
577                         }
578                         t.infoHashV2.Set(v2Hash)
579                         t.infoHash.SetNone()
580                         if info.HasV1() {
581                                 cl.torrentsByShortHash[v1Hash] = t
582                                 t.infoHash.Set(v1Hash)
583                         }
584                 }
585         } else if t.infoHash.Ok && t.infoHashV2.Ok {
586                 if v1Hash != t.infoHash.Value {
587                         return errors.New("incorrect v1 infohash")
588                 }
589                 if v2Hash != t.infoHashV2.Value {
590                         return errors.New("incorrect v2 infohash")
591                 }
592         } else if !t.infoHash.Ok && t.infoHashV2.Ok {
593                 if v2Hash != t.infoHashV2.Value {
594                         return errors.New("incorrect v2 infohash")
595                 }
596                 if info.HasV1() {
597                         t.infoHash.Set(v1Hash)
598                         cl.torrentsByShortHash[v1Hash] = t
599                 }
600         } else {
601                 panic("no expected infohashes")
602         }
603         return nil
604 }
605
606 // Called when metadata for a torrent becomes available.
607 func (t *Torrent) setInfoBytesLocked(b []byte) (err error) {
608         var info metainfo.Info
609         err = bencode.Unmarshal(b, &info)
610         if err != nil {
611                 err = fmt.Errorf("unmarshalling info bytes: %w", err)
612                 return
613         }
614         err = t.hashInfoBytes(b, &info)
615         if err != nil {
616                 return
617         }
618         t.metadataBytes = b
619         t.metadataCompletedChunks = nil
620         if t.info != nil {
621                 return nil
622         }
623         if err := t.setInfo(&info); err != nil {
624                 return err
625         }
626         t.onSetInfo()
627         return nil
628 }
629
630 func (t *Torrent) haveAllMetadataPieces() bool {
631         if t.haveInfo() {
632                 return true
633         }
634         if t.metadataCompletedChunks == nil {
635                 return false
636         }
637         for _, have := range t.metadataCompletedChunks {
638                 if !have {
639                         return false
640                 }
641         }
642         return true
643 }
644
645 // TODO: Propagate errors to disconnect peer.
646 func (t *Torrent) setMetadataSize(size int) (err error) {
647         if t.haveInfo() {
648                 // We already know the correct metadata size.
649                 return
650         }
651         if uint32(size) > maxMetadataSize {
652                 return log.WithLevel(log.Warning, errors.New("bad size"))
653         }
654         if len(t.metadataBytes) == size {
655                 return
656         }
657         t.metadataBytes = make([]byte, size)
658         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
659         t.metadataChanged.Broadcast()
660         for c := range t.conns {
661                 c.requestPendingMetadata()
662         }
663         return
664 }
665
666 // The current working name for the torrent. Either the name in the info dict,
667 // or a display name given such as by the dn value in a magnet link, or "".
668 func (t *Torrent) name() string {
669         t.nameMu.RLock()
670         defer t.nameMu.RUnlock()
671         if t.haveInfo() {
672                 return t.info.BestName()
673         }
674         if t.displayName != "" {
675                 return t.displayName
676         }
677         return "infohash:" + t.canonicalShortInfohash().HexString()
678 }
679
680 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
681         p := &t.pieces[index]
682         ret.Priority = t.piecePriority(index)
683         ret.Completion = p.completion()
684         ret.QueuedForHash = p.queuedForHash()
685         ret.Hashing = p.hashing
686         ret.Checking = ret.QueuedForHash || ret.Hashing
687         ret.Marking = p.marking
688         if !ret.Complete && t.piecePartiallyDownloaded(index) {
689                 ret.Partial = true
690         }
691         if t.info.HasV2() && !p.hashV2.Ok {
692                 ret.MissingPieceLayerHash = true
693         }
694         return
695 }
696
697 func (t *Torrent) metadataPieceSize(piece int) int {
698         return metadataPieceSize(len(t.metadataBytes), piece)
699 }
700
701 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
702         return pp.Message{
703                 Type:       pp.Extended,
704                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
705                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
706                         Piece:     piece,
707                         TotalSize: len(t.metadataBytes),
708                         Type:      msgType,
709                 }), data...),
710         }
711 }
712
713 type pieceAvailabilityRun struct {
714         Count        pieceIndex
715         Availability int
716 }
717
718 func (me pieceAvailabilityRun) String() string {
719         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
720 }
721
722 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
723         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
724                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
725         })
726         for i := range t.pieces {
727                 rle.Append(t.pieces[i].availability(), 1)
728         }
729         rle.Flush()
730         return
731 }
732
733 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
734         freqs = make([]int, t.numActivePeers()+1)
735         for i := range t.pieces {
736                 freqs[t.piece(i).availability()]++
737         }
738         return
739 }
740
741 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
742         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
743                 ret = append(ret, PieceStateRun{
744                         PieceState: el.(PieceState),
745                         Length:     int(count),
746                 })
747         })
748         for index := range t.pieces {
749                 rle.Append(t.pieceState(index), 1)
750         }
751         rle.Flush()
752         return
753 }
754
755 // Produces a small string representing a PieceStateRun.
756 func (psr PieceStateRun) String() (ret string) {
757         ret = fmt.Sprintf("%d", psr.Length)
758         ret += func() string {
759                 switch psr.Priority {
760                 case PiecePriorityNext:
761                         return "N"
762                 case PiecePriorityNormal:
763                         return "."
764                 case PiecePriorityReadahead:
765                         return "R"
766                 case PiecePriorityNow:
767                         return "!"
768                 case PiecePriorityHigh:
769                         return "H"
770                 default:
771                         return ""
772                 }
773         }()
774         if psr.Hashing {
775                 ret += "H"
776         }
777         if psr.QueuedForHash {
778                 ret += "Q"
779         }
780         if psr.Marking {
781                 ret += "M"
782         }
783         if psr.Partial {
784                 ret += "P"
785         }
786         if psr.Complete {
787                 ret += "C"
788         }
789         if !psr.Ok {
790                 ret += "?"
791         }
792         if psr.MissingPieceLayerHash {
793                 ret += "h"
794         }
795         return
796 }
797
798 func (t *Torrent) writeStatus(w io.Writer) {
799         if t.infoHash.Ok {
800                 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.Value.HexString())
801         }
802         if t.infoHashV2.Ok {
803                 fmt.Fprintf(w, "Infohash v2: %s\n", t.infoHashV2.Value.HexString())
804         }
805         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
806         if !t.haveInfo() {
807                 fmt.Fprintf(w, "Metadata have: ")
808                 for _, h := range t.metadataCompletedChunks {
809                         fmt.Fprintf(w, "%c", func() rune {
810                                 if h {
811                                         return 'H'
812                                 } else {
813                                         return '.'
814                                 }
815                         }())
816                 }
817                 fmt.Fprintln(w)
818         }
819         fmt.Fprintf(w, "Piece length: %s\n",
820                 func() string {
821                         if t.haveInfo() {
822                                 return fmt.Sprintf("%v (%v chunks)",
823                                         t.usualPieceSize(),
824                                         float64(t.usualPieceSize())/float64(t.chunkSize))
825                         } else {
826                                 return "no info"
827                         }
828                 }(),
829         )
830         if t.info != nil {
831                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
832                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
833                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
834                 // availability frequencies instead.
835                 if false {
836                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
837                                 for _, run := range t.pieceAvailabilityRuns() {
838                                         ret = append(ret, run.String())
839                                 }
840                                 return
841                         }(), " "))
842                 }
843                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
844                         func() (ret []string) {
845                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
846                                         if freq == 0 {
847                                                 continue
848                                         }
849                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
850                                 }
851                                 return
852                         }(),
853                         ", "))
854         }
855         fmt.Fprintf(w, "Reader Pieces:")
856         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
857                 fmt.Fprintf(w, " %d:%d", begin, end)
858                 return true
859         })
860         fmt.Fprintln(w)
861
862         fmt.Fprintf(w, "Enabled trackers:\n")
863         func() {
864                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
865                 fmt.Fprintf(tw, "    URL\tExtra\n")
866                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
867                         lu := l.URL()
868                         ru := r.URL()
869                         var luns, runs url.URL = *lu, *ru
870                         luns.Scheme = ""
871                         runs.Scheme = ""
872                         var ml missinggo.MultiLess
873                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
874                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
875                         return ml.Less()
876                 }).([]torrentTrackerAnnouncer) {
877                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
878                 }
879                 tw.Flush()
880         }()
881
882         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
883
884         dumpStats(w, t.statsLocked())
885
886         fmt.Fprintf(w, "webseeds:\n")
887         t.writePeerStatuses(w, maps.Values(t.webSeeds))
888
889         peerConns := maps.Keys(t.conns)
890         // Peers without priorities first, then those with. I'm undecided about how to order peers
891         // without priorities.
892         sort.Slice(peerConns, func(li, ri int) bool {
893                 l := peerConns[li]
894                 r := peerConns[ri]
895                 ml := multiless.New()
896                 lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
897                 rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
898                 ml = ml.Bool(lpp.Ok, rpp.Ok)
899                 ml = ml.Uint32(rpp.Value, lpp.Value)
900                 return ml.Less()
901         })
902
903         fmt.Fprintf(w, "%v peer conns:\n", len(peerConns))
904         t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
905                 return &pc.Peer
906         }))
907 }
908
909 func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
910         var buf bytes.Buffer
911         for _, c := range peers {
912                 fmt.Fprintf(w, "- ")
913                 buf.Reset()
914                 c.writeStatus(&buf)
915                 w.Write(bytes.TrimRight(
916                         bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n  ")),
917                         " "))
918         }
919 }
920
921 func (t *Torrent) haveInfo() bool {
922         return t.info != nil
923 }
924
925 // Returns a run-time generated MetaInfo that includes the info bytes and
926 // announce-list as currently known to the client.
927 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
928         return metainfo.MetaInfo{
929                 CreationDate: time.Now().Unix(),
930                 Comment:      "dynamic metainfo from client",
931                 CreatedBy:    "go.torrent",
932                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
933                 InfoBytes: func() []byte {
934                         if t.haveInfo() {
935                                 return t.metadataBytes
936                         } else {
937                                 return nil
938                         }
939                 }(),
940                 UrlList: func() []string {
941                         ret := make([]string, 0, len(t.webSeeds))
942                         for url := range t.webSeeds {
943                                 ret = append(ret, url)
944                         }
945                         return ret
946                 }(),
947         }
948 }
949
950 // Returns a count of bytes that are not complete in storage, and not pending being written to
951 // storage. This value is from the perspective of the download manager, and may not agree with the
952 // actual state in storage. If you want read data synchronously you should use a Reader. See
953 // https://github.com/anacrolix/torrent/issues/828.
954 func (t *Torrent) BytesMissing() (n int64) {
955         t.cl.rLock()
956         n = t.bytesMissingLocked()
957         t.cl.rUnlock()
958         return
959 }
960
961 func (t *Torrent) bytesMissingLocked() int64 {
962         return t.bytesLeft()
963 }
964
965 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
966         roaring.Flip(b, 0, end).Iterate(cb)
967 }
968
969 func (t *Torrent) bytesLeft() (left int64) {
970         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
971                 p := t.piece(pieceIndex(x))
972                 left += int64(p.length() - p.numDirtyBytes())
973                 return true
974         })
975         return
976 }
977
978 // Bytes left to give in tracker announces.
979 func (t *Torrent) bytesLeftAnnounce() int64 {
980         if t.haveInfo() {
981                 return t.bytesLeft()
982         } else {
983                 return -1
984         }
985 }
986
987 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
988         if t.pieceComplete(piece) {
989                 return false
990         }
991         if t.pieceAllDirty(piece) {
992                 return false
993         }
994         return t.pieces[piece].hasDirtyChunks()
995 }
996
997 func (t *Torrent) usualPieceSize() int {
998         return int(t.info.PieceLength)
999 }
1000
1001 func (t *Torrent) numPieces() pieceIndex {
1002         return t.info.NumPieces()
1003 }
1004
1005 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
1006         return pieceIndex(t._completedPieces.GetCardinality())
1007 }
1008
1009 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
1010         if !t.closed.Set() {
1011                 err = errors.New("already closed")
1012                 return
1013         }
1014         for _, f := range t.onClose {
1015                 f()
1016         }
1017         if t.storage != nil {
1018                 wg.Add(1)
1019                 go func() {
1020                         defer wg.Done()
1021                         t.storageLock.Lock()
1022                         defer t.storageLock.Unlock()
1023                         if f := t.storage.Close; f != nil {
1024                                 err1 := f()
1025                                 if err1 != nil {
1026                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
1027                                 }
1028                         }
1029                 }()
1030         }
1031         t.iterPeers(func(p *Peer) {
1032                 p.close()
1033         })
1034         if t.storage != nil {
1035                 t.deletePieceRequestOrder()
1036         }
1037         t.assertAllPiecesRelativeAvailabilityZero()
1038         t.pex.Reset()
1039         t.cl.event.Broadcast()
1040         t.pieceStateChanges.Close()
1041         t.updateWantPeersEvent()
1042         return
1043 }
1044
1045 func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
1046         for i := range t.pieces {
1047                 p := t.piece(i)
1048                 if p.relativeAvailability != 0 {
1049                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
1050                 }
1051         }
1052 }
1053
1054 func (t *Torrent) requestOffset(r Request) int64 {
1055         return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
1056 }
1057
1058 // Return the request that would include the given offset into the torrent data. Returns !ok if
1059 // there is no such request.
1060 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
1061         return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
1062 }
1063
1064 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
1065         //defer perf.ScopeTimerErr(&err)()
1066         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
1067         if err == nil && n != len(data) {
1068                 err = io.ErrShortWrite
1069         }
1070         return err
1071 }
1072
1073 func (t *Torrent) bitfield() (bf []bool) {
1074         bf = make([]bool, t.numPieces())
1075         t._completedPieces.Iterate(func(piece uint32) (again bool) {
1076                 bf[piece] = true
1077                 return true
1078         })
1079         return
1080 }
1081
1082 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
1083         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
1084 }
1085
1086 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
1087         return t._chunksPerRegularPiece
1088 }
1089
1090 func (t *Torrent) numChunks() RequestIndex {
1091         if t.numPieces() == 0 {
1092                 return 0
1093         }
1094         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
1095 }
1096
1097 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
1098         t.dirtyChunks.RemoveRange(
1099                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
1100                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
1101 }
1102
1103 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
1104         if t.info.PieceLength == 0 {
1105                 // There will be no variance amongst pieces. Only pain.
1106                 return 0
1107         }
1108         if t.info.FilesArePieceAligned() {
1109                 p := t.piece(piece)
1110                 file := p.mustGetOnlyFile()
1111                 if piece == file.EndPieceIndex()-1 {
1112                         return pp.Integer(file.length - (p.torrentBeginOffset() - file.offset))
1113                 }
1114                 return pp.Integer(t.usualPieceSize())
1115         }
1116         if piece == t.numPieces()-1 {
1117                 ret := pp.Integer(t.length() % t.info.PieceLength)
1118                 if ret != 0 {
1119                         return ret
1120                 }
1121         }
1122         return pp.Integer(t.info.PieceLength)
1123 }
1124
1125 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
1126         return &blockCheckingWriter{
1127                 cache:        &t.smartBanCache,
1128                 requestIndex: t.pieceRequestIndexOffset(piece),
1129                 chunkSize:    t.chunkSize.Int(),
1130         }
1131 }
1132
1133 func (t *Torrent) hashPiece(piece pieceIndex) (
1134         correct bool,
1135         // These are peers that sent us blocks that differ from what we hash here.
1136         differingPeers map[bannableAddr]struct{},
1137         err error,
1138 ) {
1139         p := t.piece(piece)
1140         p.waitNoPendingWrites()
1141         storagePiece := p.Storage()
1142
1143         var h hash.Hash
1144         if p.hash != nil {
1145                 h = pieceHash.New()
1146
1147                 // Does the backend want to do its own hashing?
1148                 if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
1149                         var sum metainfo.Hash
1150                         // log.Printf("A piece decided to self-hash: %d", piece)
1151                         sum, err = i.SelfHash()
1152                         correct = sum == *p.hash
1153                         // Can't do smart banning without reading the piece. The smartBanCache is still cleared
1154                         // in pieceHasher regardless.
1155                         return
1156                 }
1157
1158         } else if p.hashV2.Ok {
1159                 h = merkle.NewHash()
1160         } else {
1161                 panic("no hash")
1162         }
1163
1164         const logPieceContents = false
1165         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
1166         writers := []io.Writer{h, smartBanWriter}
1167         var examineBuf bytes.Buffer
1168         if logPieceContents {
1169                 writers = append(writers, &examineBuf)
1170         }
1171         var written int64
1172         written, err = storagePiece.WriteTo(io.MultiWriter(writers...))
1173         if err == nil && written != int64(p.length()) {
1174                 err = io.ErrShortWrite
1175         }
1176         if logPieceContents {
1177                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
1178         }
1179         smartBanWriter.Flush()
1180         differingPeers = smartBanWriter.badPeers
1181         if p.hash != nil {
1182                 var sum [20]byte
1183                 n := len(h.Sum(sum[:0]))
1184                 if n != 20 {
1185                         panic(n)
1186                 }
1187                 correct = sum == *p.hash
1188         } else if p.hashV2.Ok {
1189                 var sum [32]byte
1190                 n := len(h.Sum(sum[:0]))
1191                 if n != 32 {
1192                         panic(n)
1193                 }
1194                 correct = sum == p.hashV2.Value
1195         } else {
1196                 panic("no hash")
1197         }
1198         return
1199 }
1200
1201 func (t *Torrent) haveAnyPieces() bool {
1202         return !t._completedPieces.IsEmpty()
1203 }
1204
1205 func (t *Torrent) haveAllPieces() bool {
1206         if !t.haveInfo() {
1207                 return false
1208         }
1209         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1210 }
1211
1212 func (t *Torrent) havePiece(index pieceIndex) bool {
1213         return t.haveInfo() && t.pieceComplete(index)
1214 }
1215
1216 func (t *Torrent) maybeDropMutuallyCompletePeer(
1217         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1218         // okay?
1219         p *PeerConn,
1220 ) {
1221         if !t.cl.config.DropMutuallyCompletePeers {
1222                 return
1223         }
1224         if !t.haveAllPieces() {
1225                 return
1226         }
1227         if all, known := p.peerHasAllPieces(); !(known && all) {
1228                 return
1229         }
1230         if p.useful() {
1231                 return
1232         }
1233         p.logger.Levelf(log.Debug, "is mutually complete; dropping")
1234         p.drop()
1235 }
1236
1237 func (t *Torrent) haveChunk(r Request) (ret bool) {
1238         // defer func() {
1239         //      log.Println("have chunk", r, ret)
1240         // }()
1241         if !t.haveInfo() {
1242                 return false
1243         }
1244         if t.pieceComplete(pieceIndex(r.Index)) {
1245                 return true
1246         }
1247         p := &t.pieces[r.Index]
1248         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1249 }
1250
1251 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1252         return chunkIndexType(cs.Begin / chunkSize)
1253 }
1254
1255 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1256         return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
1257 }
1258
1259 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1260 // conns (which is a map).
1261 var peerConnSlices sync.Pool
1262
1263 func getPeerConnSlice(cap int) []*PeerConn {
1264         getInterface := peerConnSlices.Get()
1265         if getInterface == nil {
1266                 return make([]*PeerConn, 0, cap)
1267         } else {
1268                 return getInterface.([]*PeerConn)[:0]
1269         }
1270 }
1271
1272 // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
1273 // this is a frequent occurrence.
1274 func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
1275         sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
1276         f(sl)
1277         peerConnSlices.Put(sl)
1278 }
1279
1280 func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
1281         wcs := worseConnSlice{conns: sl}
1282         wcs.initKeys(opts)
1283         heap.Init(&wcs)
1284         for wcs.Len() != 0 {
1285                 c := heap.Pop(&wcs).(*PeerConn)
1286                 if opts.incomingIsBad && !c.outgoing {
1287                         return c
1288                 }
1289                 if opts.outgoingIsBad && c.outgoing {
1290                         return c
1291                 }
1292                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1293                         return c
1294                 }
1295                 // If the connection is in the worst half of the established
1296                 // connection quota and is older than a minute.
1297                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1298                         // Give connections 1 minute to prove themselves.
1299                         if time.Since(c.completedHandshake) > time.Minute {
1300                                 return c
1301                         }
1302                 }
1303         }
1304         return nil
1305 }
1306
1307 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1308 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1309 // established connections for more than a minute. This is O(n log n). If there was a way to not
1310 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1311 func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
1312         t.withUnclosedConns(func(ucs []*PeerConn) {
1313                 ret = t.worstBadConnFromSlice(opts, ucs)
1314         })
1315         return
1316 }
1317
1318 type PieceStateChange struct {
1319         Index int
1320         PieceState
1321 }
1322
1323 func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
1324         t.cl._mu.Defer(func() {
1325                 cur := t.pieceState(piece)
1326                 p := &t.pieces[piece]
1327                 if cur != p.publicPieceState {
1328                         p.publicPieceState = cur
1329                         t.pieceStateChanges.Publish(PieceStateChange{
1330                                 int(piece),
1331                                 cur,
1332                         })
1333                 }
1334         })
1335 }
1336
1337 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1338         if t.pieceComplete(piece) {
1339                 return 0
1340         }
1341         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1342 }
1343
1344 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1345         return t.pieces[piece].allChunksDirty()
1346 }
1347
1348 func (t *Torrent) readersChanged() {
1349         t.updateReaderPieces()
1350         t.updateAllPiecePriorities("Torrent.readersChanged")
1351 }
1352
1353 func (t *Torrent) updateReaderPieces() {
1354         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1355 }
1356
1357 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1358         if from == to {
1359                 return
1360         }
1361         t.updateReaderPieces()
1362         // Order the ranges, high and low.
1363         l, h := from, to
1364         if l.begin > h.begin {
1365                 l, h = h, l
1366         }
1367         if l.end < h.begin {
1368                 // Two distinct ranges.
1369                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1370                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1371         } else {
1372                 // Ranges overlap.
1373                 end := l.end
1374                 if h.end > end {
1375                         end = h.end
1376                 }
1377                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1378         }
1379 }
1380
1381 func (t *Torrent) maybeNewConns() {
1382         // Tickle the accept routine.
1383         t.cl.event.Broadcast()
1384         t.openNewConns()
1385 }
1386
1387 func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string) {
1388         if t._pendingPieces.Contains(uint32(piece)) {
1389                 t.iterPeers(func(c *Peer) {
1390                         // if c.requestState.Interested {
1391                         //      return
1392                         // }
1393                         if !c.isLowOnRequests() {
1394                                 return
1395                         }
1396                         if !c.peerHasPiece(piece) {
1397                                 return
1398                         }
1399                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1400                                 return
1401                         }
1402                         c.updateRequests(reason)
1403                 })
1404         }
1405         t.maybeNewConns()
1406         t.publishPieceStateChange(piece)
1407 }
1408
1409 func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
1410         if !t.closed.IsSet() {
1411                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1412                 // request order.
1413                 t.updatePieceRequestOrderPiece(piece)
1414         }
1415         p := &t.pieces[piece]
1416         newPrio := p.uncachedPriority()
1417         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1418         if newPrio == PiecePriorityNone {
1419                 return t._pendingPieces.CheckedRemove(uint32(piece))
1420         } else {
1421                 return t._pendingPieces.CheckedAdd(uint32(piece))
1422         }
1423 }
1424
1425 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1426         if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
1427                 t.onPiecePendingTriggers(piece, reason)
1428         }
1429         t.updatePieceRequestOrderPiece(piece)
1430 }
1431
1432 func (t *Torrent) updateAllPiecePriorities(reason string) {
1433         t.updatePiecePriorities(0, t.numPieces(), reason)
1434 }
1435
1436 // Update all piece priorities in one hit. This function should have the same
1437 // output as updatePiecePriority, but across all pieces.
1438 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1439         for i := begin; i < end; i++ {
1440                 t.updatePiecePriority(i, reason)
1441         }
1442 }
1443
1444 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1445 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1446         if off >= t.length() {
1447                 return
1448         }
1449         if off < 0 {
1450                 size += off
1451                 off = 0
1452         }
1453         if size <= 0 {
1454                 return
1455         }
1456         begin = pieceIndex(off / t.info.PieceLength)
1457         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1458         if end > pieceIndex(t.info.NumPieces()) {
1459                 end = pieceIndex(t.info.NumPieces())
1460         }
1461         return
1462 }
1463
1464 // Returns true if all iterations complete without breaking. Returns the read regions for all
1465 // readers. The reader regions should not be merged as some callers depend on this method to
1466 // enumerate readers.
1467 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1468         for r := range t.readers {
1469                 p := r.pieces
1470                 if p.begin >= p.end {
1471                         continue
1472                 }
1473                 if !f(p.begin, p.end) {
1474                         return false
1475                 }
1476         }
1477         return true
1478 }
1479
1480 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1481         return t.piece(piece).uncachedPriority()
1482 }
1483
1484 func (t *Torrent) pendRequest(req RequestIndex) {
1485         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1486 }
1487
1488 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1489         t.cl.event.Broadcast()
1490         if t.pieceComplete(piece) {
1491                 t.onPieceCompleted(piece)
1492         } else {
1493                 t.onIncompletePiece(piece)
1494         }
1495         t.updatePiecePriority(piece, reason)
1496 }
1497
1498 func (t *Torrent) numReceivedConns() (ret int) {
1499         for c := range t.conns {
1500                 if c.Discovery == PeerSourceIncoming {
1501                         ret++
1502                 }
1503         }
1504         return
1505 }
1506
1507 func (t *Torrent) numOutgoingConns() (ret int) {
1508         for c := range t.conns {
1509                 if c.outgoing {
1510                         ret++
1511                 }
1512         }
1513         return
1514 }
1515
1516 func (t *Torrent) maxHalfOpen() int {
1517         // Note that if we somehow exceed the maximum established conns, we want
1518         // the negative value to have an effect.
1519         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1520         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1521         // We want to allow some experimentation with new peers, and to try to
1522         // upset an oversupply of received connections.
1523         return int(min(
1524                 max(5, extraIncoming)+establishedHeadroom,
1525                 int64(t.cl.config.HalfOpenConnsPerTorrent),
1526         ))
1527 }
1528
1529 func (t *Torrent) openNewConns() (initiated int) {
1530         defer t.updateWantPeersEvent()
1531         for t.peers.Len() != 0 {
1532                 if !t.wantOutgoingConns() {
1533                         return
1534                 }
1535                 if len(t.halfOpen) >= t.maxHalfOpen() {
1536                         return
1537                 }
1538                 if len(t.cl.dialers) == 0 {
1539                         return
1540                 }
1541                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1542                         return
1543                 }
1544                 p := t.peers.PopMax()
1545                 opts := outgoingConnOpts{
1546                         peerInfo:                 p,
1547                         t:                        t,
1548                         requireRendezvous:        false,
1549                         skipHolepunchRendezvous:  false,
1550                         receivedHolepunchConnect: false,
1551                         HeaderObfuscationPolicy:  t.cl.config.HeaderObfuscationPolicy,
1552                 }
1553                 initiateConn(opts, false)
1554                 initiated++
1555         }
1556         return
1557 }
1558
1559 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1560         p := t.piece(piece)
1561         uncached := t.pieceCompleteUncached(piece)
1562         cached := p.completion()
1563         changed := cached != uncached
1564         complete := uncached.Complete
1565         p.storageCompletionOk = uncached.Ok
1566         x := uint32(piece)
1567         if complete {
1568                 t._completedPieces.Add(x)
1569                 t.openNewConns()
1570         } else {
1571                 t._completedPieces.Remove(x)
1572         }
1573         p.t.updatePieceRequestOrderPiece(piece)
1574         t.updateComplete()
1575         if complete && len(p.dirtiers) != 0 {
1576                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1577         }
1578         if changed {
1579                 //slog.Debug(
1580                 //      "piece completion changed",
1581                 //      slog.Int("piece", piece),
1582                 //      slog.Any("from", cached),
1583                 //      slog.Any("to", uncached))
1584                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1585         }
1586         return changed
1587 }
1588
1589 // Non-blocking read. Client lock is not required.
1590 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1591         for len(b) != 0 {
1592                 p := &t.pieces[off/t.info.PieceLength]
1593                 p.waitNoPendingWrites()
1594                 var n1 int
1595                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1596                 if n1 == 0 {
1597                         break
1598                 }
1599                 off += int64(n1)
1600                 n += n1
1601                 b = b[n1:]
1602         }
1603         return
1604 }
1605
1606 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1607 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1608 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1609 func (t *Torrent) maybeCompleteMetadata() error {
1610         if t.haveInfo() {
1611                 // Nothing to do.
1612                 return nil
1613         }
1614         if !t.haveAllMetadataPieces() {
1615                 // Don't have enough metadata pieces.
1616                 return nil
1617         }
1618         err := t.setInfoBytesLocked(t.metadataBytes)
1619         if err != nil {
1620                 t.invalidateMetadata()
1621                 return fmt.Errorf("error setting info bytes: %s", err)
1622         }
1623         if t.cl.config.Debug {
1624                 t.logger.Printf("%s: got metadata from peers", t)
1625         }
1626         return nil
1627 }
1628
1629 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1630         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1631                 if end > begin {
1632                         now.Add(bitmap.BitIndex(begin))
1633                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1634                 }
1635                 return true
1636         })
1637         return
1638 }
1639
1640 func (t *Torrent) needData() bool {
1641         if t.closed.IsSet() {
1642                 return false
1643         }
1644         if !t.haveInfo() {
1645                 return true
1646         }
1647         return !t._pendingPieces.IsEmpty()
1648 }
1649
1650 func appendMissingStrings(old, new []string) (ret []string) {
1651         ret = old
1652 new:
1653         for _, n := range new {
1654                 for _, o := range old {
1655                         if o == n {
1656                                 continue new
1657                         }
1658                 }
1659                 ret = append(ret, n)
1660         }
1661         return
1662 }
1663
1664 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1665         ret = existing
1666         for minNumTiers > len(ret) {
1667                 ret = append(ret, nil)
1668         }
1669         return
1670 }
1671
1672 func (t *Torrent) addTrackers(announceList [][]string) {
1673         fullAnnounceList := &t.metainfo.AnnounceList
1674         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1675         for tierIndex, trackerURLs := range announceList {
1676                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1677         }
1678         t.startMissingTrackerScrapers()
1679         t.updateWantPeersEvent()
1680 }
1681
1682 // Don't call this before the info is available.
1683 func (t *Torrent) bytesCompleted() int64 {
1684         if !t.haveInfo() {
1685                 return 0
1686         }
1687         return t.length() - t.bytesLeft()
1688 }
1689
1690 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1691         t.cl.lock()
1692         defer t.cl.unlock()
1693         return t.setInfoBytesLocked(b)
1694 }
1695
1696 // Returns true if connection is removed from torrent.Conns.
1697 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1698         if !c.closed.IsSet() {
1699                 panic("connection is not closed")
1700                 // There are behaviours prevented by the closed state that will fail
1701                 // if the connection has been deleted.
1702         }
1703         _, ret = t.conns[c]
1704         delete(t.conns, c)
1705         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1706         // the drop event against the PexConnState instead.
1707         if ret {
1708                 if !t.cl.config.DisablePEX {
1709                         t.pex.Drop(c)
1710                 }
1711         }
1712         torrent.Add("deleted connections", 1)
1713         c.deleteAllRequests("Torrent.deletePeerConn")
1714         t.assertPendingRequests()
1715         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1716                 panic(t.connsWithAllPieces)
1717         }
1718         return
1719 }
1720
1721 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1722         if t.deleteConnWithAllPieces(p) {
1723                 return
1724         }
1725         if !t.haveInfo() {
1726                 return
1727         }
1728         p.peerPieces().Iterate(func(i uint32) bool {
1729                 p.t.decPieceAvailability(pieceIndex(i))
1730                 return true
1731         })
1732 }
1733
1734 func (t *Torrent) assertPendingRequests() {
1735         if !check.Enabled {
1736                 return
1737         }
1738         // var actual pendingRequests
1739         // if t.haveInfo() {
1740         //      actual.m = make([]int, t.numChunks())
1741         // }
1742         // t.iterPeers(func(p *Peer) {
1743         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1744         //              actual.Inc(x)
1745         //              return true
1746         //      })
1747         // })
1748         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1749         // if diff != "" {
1750         //      panic(diff)
1751         // }
1752 }
1753
1754 func (t *Torrent) dropConnection(c *PeerConn) {
1755         t.cl.event.Broadcast()
1756         c.close()
1757         if t.deletePeerConn(c) {
1758                 t.openNewConns()
1759         }
1760 }
1761
1762 // Peers as in contact information for dialing out.
1763 func (t *Torrent) wantPeers() bool {
1764         if t.closed.IsSet() {
1765                 return false
1766         }
1767         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1768                 return false
1769         }
1770         return t.wantOutgoingConns()
1771 }
1772
1773 func (t *Torrent) updateWantPeersEvent() {
1774         if t.wantPeers() {
1775                 t.wantPeersEvent.Set()
1776         } else {
1777                 t.wantPeersEvent.Clear()
1778         }
1779 }
1780
1781 // Returns whether the client should make effort to seed the torrent.
1782 func (t *Torrent) seeding() bool {
1783         cl := t.cl
1784         if t.closed.IsSet() {
1785                 return false
1786         }
1787         if t.dataUploadDisallowed {
1788                 return false
1789         }
1790         if cl.config.NoUpload {
1791                 return false
1792         }
1793         if !cl.config.Seed {
1794                 return false
1795         }
1796         if cl.config.DisableAggressiveUpload && t.needData() {
1797                 return false
1798         }
1799         return true
1800 }
1801
1802 func (t *Torrent) onWebRtcConn(
1803         c datachannel.ReadWriteCloser,
1804         dcc webtorrent.DataChannelContext,
1805 ) {
1806         defer c.Close()
1807         netConn := webrtcNetConn{
1808                 ReadWriteCloser:    c,
1809                 DataChannelContext: dcc,
1810         }
1811         peerRemoteAddr := netConn.RemoteAddr()
1812         //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
1813         if t.cl.badPeerAddr(peerRemoteAddr) {
1814                 return
1815         }
1816         localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
1817         pc, err := t.cl.initiateProtocolHandshakes(
1818                 context.Background(),
1819                 netConn,
1820                 t,
1821                 false,
1822                 newConnectionOpts{
1823                         outgoing:        dcc.LocalOffered,
1824                         remoteAddr:      peerRemoteAddr,
1825                         localPublicAddr: localAddrIpPort,
1826                         network:         webrtcNetwork,
1827                         connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1828                 },
1829         )
1830         if err != nil {
1831                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1832                 return
1833         }
1834         if dcc.LocalOffered {
1835                 pc.Discovery = PeerSourceTracker
1836         } else {
1837                 pc.Discovery = PeerSourceIncoming
1838         }
1839         pc.conn.SetWriteDeadline(time.Time{})
1840         t.cl.lock()
1841         defer t.cl.unlock()
1842         err = t.runHandshookConn(pc)
1843         if err != nil {
1844                 t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
1845         }
1846 }
1847
1848 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1849         err := t.runHandshookConn(pc)
1850         if err != nil || logAll {
1851                 t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
1852         }
1853 }
1854
1855 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1856         t.logRunHandshookConn(pc, false, log.Debug)
1857 }
1858
1859 func (t *Torrent) startWebsocketAnnouncer(u url.URL, shortInfohash [20]byte) torrentTrackerAnnouncer {
1860         wtc, release := t.cl.websocketTrackers.Get(u.String(), shortInfohash)
1861         // This needs to run before the Torrent is dropped from the Client, to prevent a new
1862         // webtorrent.TrackerClient for the same info hash before the old one is cleaned up.
1863         t.onClose = append(t.onClose, release)
1864         wst := websocketTrackerStatus{u, wtc}
1865         go func() {
1866                 err := wtc.Announce(tracker.Started, shortInfohash)
1867                 if err != nil {
1868                         t.logger.WithDefaultLevel(log.Warning).Printf(
1869                                 "error in initial announce to %q: %v",
1870                                 u.String(), err,
1871                         )
1872                 }
1873         }()
1874         return wst
1875 }
1876
1877 func (t *Torrent) startScrapingTracker(_url string) {
1878         if _url == "" {
1879                 return
1880         }
1881         u, err := url.Parse(_url)
1882         if err != nil {
1883                 // URLs with a leading '*' appear to be a uTorrent convention to disable trackers.
1884                 if _url[0] != '*' {
1885                         t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err)
1886                 }
1887                 return
1888         }
1889         if u.Scheme == "udp" {
1890                 u.Scheme = "udp4"
1891                 t.startScrapingTracker(u.String())
1892                 u.Scheme = "udp6"
1893                 t.startScrapingTracker(u.String())
1894                 return
1895         }
1896         if t.infoHash.Ok {
1897                 t.startScrapingTrackerWithInfohash(u, _url, t.infoHash.Value)
1898         }
1899         if t.infoHashV2.Ok {
1900                 t.startScrapingTrackerWithInfohash(u, _url, *t.infoHashV2.Value.ToShort())
1901         }
1902 }
1903
1904 func (t *Torrent) startScrapingTrackerWithInfohash(u *url.URL, urlStr string, shortInfohash [20]byte) {
1905         announcerKey := torrentTrackerAnnouncerKey{
1906                 shortInfohash: shortInfohash,
1907                 url:           urlStr,
1908         }
1909         if _, ok := t.trackerAnnouncers[announcerKey]; ok {
1910                 return
1911         }
1912         sl := func() torrentTrackerAnnouncer {
1913                 switch u.Scheme {
1914                 case "ws", "wss":
1915                         if t.cl.config.DisableWebtorrent {
1916                                 return nil
1917                         }
1918                         return t.startWebsocketAnnouncer(*u, shortInfohash)
1919                 case "udp4":
1920                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1921                                 return nil
1922                         }
1923                 case "udp6":
1924                         if t.cl.config.DisableIPv6 {
1925                                 return nil
1926                         }
1927                 }
1928                 newAnnouncer := &trackerScraper{
1929                         shortInfohash:   shortInfohash,
1930                         u:               *u,
1931                         t:               t,
1932                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1933                 }
1934                 go newAnnouncer.Run()
1935                 return newAnnouncer
1936         }()
1937         if sl == nil {
1938                 return
1939         }
1940         g.MakeMapIfNil(&t.trackerAnnouncers)
1941         if g.MapInsert(t.trackerAnnouncers, announcerKey, sl).Ok {
1942                 panic("tracker announcer already exists")
1943         }
1944 }
1945
1946 // Adds and starts tracker scrapers for tracker URLs that aren't already
1947 // running.
1948 func (t *Torrent) startMissingTrackerScrapers() {
1949         if t.cl.config.DisableTrackers {
1950                 return
1951         }
1952         t.startScrapingTracker(t.metainfo.Announce)
1953         for _, tier := range t.metainfo.AnnounceList {
1954                 for _, url := range tier {
1955                         t.startScrapingTracker(url)
1956                 }
1957         }
1958 }
1959
1960 // Returns an AnnounceRequest with fields filled out to defaults and current
1961 // values.
1962 func (t *Torrent) announceRequest(
1963         event tracker.AnnounceEvent,
1964         shortInfohash [20]byte,
1965 ) tracker.AnnounceRequest {
1966         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1967         // dependent on the network in use.
1968         return tracker.AnnounceRequest{
1969                 Event: event,
1970                 NumWant: func() int32 {
1971                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1972                                 // Windozer has UDP packet limit. See:
1973                                 // https://github.com/anacrolix/torrent/issues/764
1974                                 return 200
1975                         } else {
1976                                 return 0
1977                         }
1978                 }(),
1979                 Port:     uint16(t.cl.incomingPeerPort()),
1980                 PeerId:   t.cl.peerID,
1981                 InfoHash: shortInfohash,
1982                 Key:      t.cl.announceKey(),
1983
1984                 // The following are vaguely described in BEP 3.
1985
1986                 Left:     t.bytesLeftAnnounce(),
1987                 Uploaded: t.stats.BytesWrittenData.Int64(),
1988                 // There's no mention of wasted or unwanted download in the BEP.
1989                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1990         }
1991 }
1992
1993 // Adds peers revealed in an announce until the announce ends, or we have
1994 // enough peers.
1995 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1996         cl := t.cl
1997         for v := range pvs {
1998                 cl.lock()
1999                 added := 0
2000                 for _, cp := range v.Peers {
2001                         if cp.Port == 0 {
2002                                 // Can't do anything with this.
2003                                 continue
2004                         }
2005                         if t.addPeer(PeerInfo{
2006                                 Addr:   ipPortAddr{cp.IP, cp.Port},
2007                                 Source: PeerSourceDhtGetPeers,
2008                         }) {
2009                                 added++
2010                         }
2011                 }
2012                 cl.unlock()
2013                 // if added != 0 {
2014                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
2015                 // }
2016         }
2017 }
2018
2019 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
2020 // announce ends. stop will force the announce to end. This interface is really old-school, and
2021 // calls a private one that is much more modern. Both v1 and v2 info hashes are announced if they
2022 // exist.
2023 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
2024         var ihs [][20]byte
2025         t.cl.lock()
2026         t.eachShortInfohash(func(short [20]byte) {
2027                 ihs = append(ihs, short)
2028         })
2029         t.cl.unlock()
2030         ctx, stop := context.WithCancel(context.Background())
2031         eg, ctx := errgroup.WithContext(ctx)
2032         for _, ih := range ihs {
2033                 var ann DhtAnnounce
2034                 ann, err = s.Announce(ih, t.cl.incomingPeerPort(), true)
2035                 if err != nil {
2036                         stop()
2037                         return
2038                 }
2039                 eg.Go(func() error {
2040                         return t.dhtAnnounceConsumer(ctx, ann)
2041                 })
2042         }
2043         _done := make(chan struct{})
2044         done = _done
2045         go func() {
2046                 defer stop()
2047                 defer close(_done)
2048                 eg.Wait()
2049         }()
2050         return
2051 }
2052
2053 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
2054 // announce ends. stop will force the announce to end.
2055 func (t *Torrent) dhtAnnounceConsumer(
2056         ctx context.Context,
2057         ps DhtAnnounce,
2058 ) (
2059         err error,
2060 ) {
2061         defer ps.Close()
2062         done := make(chan struct{})
2063         go func() {
2064                 defer close(done)
2065                 t.consumeDhtAnnouncePeers(ps.Peers())
2066         }()
2067         select {
2068         case <-ctx.Done():
2069                 return context.Cause(ctx)
2070         case <-done:
2071                 return nil
2072         }
2073 }
2074
2075 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
2076         _, stop, err := t.AnnounceToDht(s)
2077         if err != nil {
2078                 return err
2079         }
2080         select {
2081         case <-t.closed.Done():
2082         case <-time.After(5 * time.Minute):
2083         }
2084         stop()
2085         return nil
2086 }
2087
2088 func (t *Torrent) dhtAnnouncer(s DhtServer) {
2089         cl := t.cl
2090         cl.lock()
2091         defer cl.unlock()
2092         for {
2093                 for {
2094                         if t.closed.IsSet() {
2095                                 return
2096                         }
2097                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
2098                         // TODO: We can include the announce_peer step depending on whether we can receive
2099                         // inbound connections. We should probably only announce once every 15 mins too.
2100                         if !t.wantAnyConns() {
2101                                 goto wait
2102                         }
2103                         // TODO: Determine if there's a listener on the port we're announcing.
2104                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
2105                                 goto wait
2106                         }
2107                         break
2108                 wait:
2109                         cl.event.Wait()
2110                 }
2111                 func() {
2112                         t.numDHTAnnounces++
2113                         cl.unlock()
2114                         defer cl.lock()
2115                         err := t.timeboxedAnnounceToDht(s)
2116                         if err != nil {
2117                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
2118                         }
2119                 }()
2120         }
2121 }
2122
2123 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
2124         for _, p := range peers {
2125                 if t.addPeer(p) {
2126                         added++
2127                 }
2128         }
2129         return
2130 }
2131
2132 // The returned TorrentStats may require alignment in memory. See
2133 // https://github.com/anacrolix/torrent/issues/383.
2134 func (t *Torrent) Stats() TorrentStats {
2135         t.cl.rLock()
2136         defer t.cl.rUnlock()
2137         return t.statsLocked()
2138 }
2139
2140 func (t *Torrent) statsLocked() (ret TorrentStats) {
2141         ret.ActivePeers = len(t.conns)
2142         ret.HalfOpenPeers = len(t.halfOpen)
2143         ret.PendingPeers = t.peers.Len()
2144         ret.TotalPeers = t.numTotalPeers()
2145         ret.ConnectedSeeders = 0
2146         for c := range t.conns {
2147                 if all, ok := c.peerHasAllPieces(); all && ok {
2148                         ret.ConnectedSeeders++
2149                 }
2150         }
2151         ret.ConnStats = t.stats.Copy()
2152         ret.PiecesComplete = t.numPiecesCompleted()
2153         return
2154 }
2155
2156 // The total number of peers in the torrent.
2157 func (t *Torrent) numTotalPeers() int {
2158         peers := make(map[string]struct{})
2159         for conn := range t.conns {
2160                 ra := conn.conn.RemoteAddr()
2161                 if ra == nil {
2162                         // It's been closed and doesn't support RemoteAddr.
2163                         continue
2164                 }
2165                 peers[ra.String()] = struct{}{}
2166         }
2167         for addr := range t.halfOpen {
2168                 peers[addr] = struct{}{}
2169         }
2170         t.peers.Each(func(peer PeerInfo) {
2171                 peers[peer.Addr.String()] = struct{}{}
2172         })
2173         return len(peers)
2174 }
2175
2176 // Reconcile bytes transferred before connection was associated with a
2177 // torrent.
2178 func (t *Torrent) reconcileHandshakeStats(c *Peer) {
2179         if c._stats != (ConnStats{
2180                 // Handshakes should only increment these fields:
2181                 BytesWritten: c._stats.BytesWritten,
2182                 BytesRead:    c._stats.BytesRead,
2183         }) {
2184                 panic("bad stats")
2185         }
2186         c.postHandshakeStats(func(cs *ConnStats) {
2187                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
2188                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
2189         })
2190         c.reconciledHandshakeStats = true
2191 }
2192
2193 // Returns true if the connection is added.
2194 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
2195         defer func() {
2196                 if err == nil {
2197                         torrent.Add("added connections", 1)
2198                 }
2199         }()
2200         if t.closed.IsSet() {
2201                 return errors.New("torrent closed")
2202         }
2203         for c0 := range t.conns {
2204                 if c.PeerID != c0.PeerID {
2205                         continue
2206                 }
2207                 if !t.cl.config.DropDuplicatePeerIds {
2208                         continue
2209                 }
2210                 if c.hasPreferredNetworkOver(c0) {
2211                         c0.close()
2212                         t.deletePeerConn(c0)
2213                 } else {
2214                         return errors.New("existing connection preferred")
2215                 }
2216         }
2217         if len(t.conns) >= t.maxEstablishedConns {
2218                 numOutgoing := t.numOutgoingConns()
2219                 numIncoming := len(t.conns) - numOutgoing
2220                 c := t.worstBadConn(worseConnLensOpts{
2221                         // We've already established that we have too many connections at this point, so we just
2222                         // need to match what kind we have too many of vs. what we're trying to add now.
2223                         incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
2224                         outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
2225                 })
2226                 if c == nil {
2227                         return errors.New("don't want conn")
2228                 }
2229                 c.close()
2230                 t.deletePeerConn(c)
2231         }
2232         if len(t.conns) >= t.maxEstablishedConns {
2233                 panic(len(t.conns))
2234         }
2235         t.conns[c] = struct{}{}
2236         t.cl.event.Broadcast()
2237         // We'll never receive the "p" extended handshake parameter.
2238         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
2239                 t.pex.Add(c)
2240         }
2241         return nil
2242 }
2243
2244 func (t *Torrent) newConnsAllowed() bool {
2245         if !t.networkingEnabled.Bool() {
2246                 return false
2247         }
2248         if t.closed.IsSet() {
2249                 return false
2250         }
2251         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2252                 return false
2253         }
2254         return true
2255 }
2256
2257 func (t *Torrent) wantAnyConns() bool {
2258         if !t.networkingEnabled.Bool() {
2259                 return false
2260         }
2261         if t.closed.IsSet() {
2262                 return false
2263         }
2264         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2265                 return false
2266         }
2267         return len(t.conns) < t.maxEstablishedConns
2268 }
2269
2270 func (t *Torrent) wantOutgoingConns() bool {
2271         if !t.newConnsAllowed() {
2272                 return false
2273         }
2274         if len(t.conns) < t.maxEstablishedConns {
2275                 return true
2276         }
2277         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2278         return t.worstBadConn(worseConnLensOpts{
2279                 incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
2280                 outgoingIsBad: false,
2281         }) != nil
2282 }
2283
2284 func (t *Torrent) wantIncomingConns() bool {
2285         if !t.newConnsAllowed() {
2286                 return false
2287         }
2288         if len(t.conns) < t.maxEstablishedConns {
2289                 return true
2290         }
2291         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2292         return t.worstBadConn(worseConnLensOpts{
2293                 incomingIsBad: false,
2294                 outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
2295         }) != nil
2296 }
2297
2298 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
2299         t.cl.lock()
2300         defer t.cl.unlock()
2301         oldMax = t.maxEstablishedConns
2302         t.maxEstablishedConns = max
2303         wcs := worseConnSlice{
2304                 conns: t.appendConns(nil, func(*PeerConn) bool {
2305                         return true
2306                 }),
2307         }
2308         wcs.initKeys(worseConnLensOpts{})
2309         heap.Init(&wcs)
2310         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
2311                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
2312         }
2313         t.openNewConns()
2314         return oldMax
2315 }
2316
2317 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
2318         t.logger.LazyLog(log.Debug, func() log.Msg {
2319                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
2320         })
2321         p := t.piece(piece)
2322         p.numVerifies++
2323         t.cl.event.Broadcast()
2324         if t.closed.IsSet() {
2325                 return
2326         }
2327
2328         // Don't score the first time a piece is hashed, it could be an initial check.
2329         if p.storageCompletionOk {
2330                 if passed {
2331                         pieceHashedCorrect.Add(1)
2332                 } else {
2333                         log.Fmsg(
2334                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
2335                         ).AddValues(t, p).LogLevel(log.Info, t.logger)
2336                         pieceHashedNotCorrect.Add(1)
2337                 }
2338         }
2339
2340         p.marking = true
2341         t.publishPieceStateChange(piece)
2342         defer func() {
2343                 p.marking = false
2344                 t.publishPieceStateChange(piece)
2345         }()
2346
2347         if passed {
2348                 if len(p.dirtiers) != 0 {
2349                         // Don't increment stats above connection-level for every involved connection.
2350                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
2351                 }
2352                 for c := range p.dirtiers {
2353                         c._stats.incrementPiecesDirtiedGood()
2354                 }
2355                 t.clearPieceTouchers(piece)
2356                 hasDirty := p.hasDirtyChunks()
2357                 t.cl.unlock()
2358                 if hasDirty {
2359                         p.Flush() // You can be synchronous here!
2360                 }
2361                 err := p.Storage().MarkComplete()
2362                 if err != nil {
2363                         t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
2364                 }
2365                 t.cl.lock()
2366
2367                 if t.closed.IsSet() {
2368                         return
2369                 }
2370                 t.pendAllChunkSpecs(piece)
2371         } else {
2372                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2373                         // Peers contributed to all the data for this piece hash failure, and the failure was
2374                         // not due to errors in the storage (such as data being dropped in a cache).
2375
2376                         // Increment Torrent and above stats, and then specific connections.
2377                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2378                         for c := range p.dirtiers {
2379                                 // Y u do dis peer?!
2380                                 c.stats().incrementPiecesDirtiedBad()
2381                         }
2382
2383                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2384                         for c := range p.dirtiers {
2385                                 if !c.trusted {
2386                                         bannableTouchers = append(bannableTouchers, c)
2387                                 }
2388                         }
2389                         t.clearPieceTouchers(piece)
2390                         slices.Sort(bannableTouchers, connLessTrusted)
2391
2392                         if t.cl.config.Debug {
2393                                 t.logger.Printf(
2394                                         "bannable conns by trust for piece %d: %v",
2395                                         piece,
2396                                         func() (ret []connectionTrust) {
2397                                                 for _, c := range bannableTouchers {
2398                                                         ret = append(ret, c.trust())
2399                                                 }
2400                                                 return
2401                                         }(),
2402                                 )
2403                         }
2404
2405                         if len(bannableTouchers) >= 1 {
2406                                 c := bannableTouchers[0]
2407                                 if len(bannableTouchers) != 1 {
2408                                         t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2409                                 } else {
2410                                         // Turns out it's still useful to ban peers like this because if there's only a
2411                                         // single peer for a piece, and we never progress that piece to completion, we
2412                                         // will never smart-ban them. Discovered in
2413                                         // https://github.com/anacrolix/torrent/issues/715.
2414                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2415                                         c.ban()
2416                                 }
2417                         }
2418                 }
2419                 t.onIncompletePiece(piece)
2420                 p.Storage().MarkNotComplete()
2421         }
2422         t.updatePieceCompletion(piece)
2423 }
2424
2425 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2426         start := t.pieceRequestIndexOffset(piece)
2427         end := start + t.pieceNumChunks(piece)
2428         for ri := start; ri < end; ri++ {
2429                 t.cancelRequest(ri)
2430         }
2431 }
2432
2433 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2434         t.pendAllChunkSpecs(piece)
2435         t.cancelRequestsForPiece(piece)
2436         t.piece(piece).readerCond.Broadcast()
2437         for conn := range t.conns {
2438                 conn.have(piece)
2439                 t.maybeDropMutuallyCompletePeer(conn)
2440         }
2441 }
2442
2443 // Called when a piece is found to be not complete.
2444 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2445         if t.pieceAllDirty(piece) {
2446                 t.pendAllChunkSpecs(piece)
2447         }
2448         if !t.wantPieceIndex(piece) {
2449                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2450                 return
2451         }
2452         // We could drop any connections that we told we have a piece that we
2453         // don't here. But there's a test failure, and it seems clients don't care
2454         // if you request pieces that you already claim to have. Pruning bad
2455         // connections might just remove any connections that aren't treating us
2456         // favourably anyway.
2457
2458         // for c := range t.conns {
2459         //      if c.sentHave(piece) {
2460         //              c.drop()
2461         //      }
2462         // }
2463         t.iterPeers(func(conn *Peer) {
2464                 if conn.peerHasPiece(piece) {
2465                         conn.updateRequests("piece incomplete")
2466                 }
2467         })
2468 }
2469
2470 func (t *Torrent) tryCreateMorePieceHashers() {
2471         for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
2472         }
2473 }
2474
2475 func (t *Torrent) tryCreatePieceHasher() bool {
2476         if t.storage == nil {
2477                 return false
2478         }
2479         pi, ok := t.getPieceToHash()
2480         if !ok {
2481                 return false
2482         }
2483         p := t.piece(pi)
2484         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2485         p.hashing = true
2486         t.publishPieceStateChange(pi)
2487         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2488         t.storageLock.RLock()
2489         t.activePieceHashes++
2490         go t.pieceHasher(pi)
2491         return true
2492 }
2493
2494 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2495         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2496                 if t.piece(i).hashing {
2497                         return true
2498                 }
2499                 ret = i
2500                 ok = true
2501                 return false
2502         })
2503         return
2504 }
2505
2506 func (t *Torrent) dropBannedPeers() {
2507         t.iterPeers(func(p *Peer) {
2508                 remoteIp := p.remoteIp()
2509                 if remoteIp == nil {
2510                         if p.bannableAddr.Ok {
2511                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2512                         }
2513                         return
2514                 }
2515                 netipAddr := netip.MustParseAddr(remoteIp.String())
2516                 if Some(netipAddr) != p.bannableAddr {
2517                         t.logger.WithDefaultLevel(log.Debug).Printf(
2518                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2519                                 p, remoteIp, p.bannableAddr)
2520                 }
2521                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2522                         // Should this be a close?
2523                         p.drop()
2524                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2525                 }
2526         })
2527 }
2528
2529 func (t *Torrent) pieceHasher(index pieceIndex) {
2530         p := t.piece(index)
2531         // Do we really need to spell out that it's a copy error? If it's a failure to hash the hash
2532         // will just be wrong.
2533         correct, failedPeers, copyErr := t.hashPiece(index)
2534         switch copyErr {
2535         case nil, io.EOF:
2536         default:
2537                 t.logger.Levelf(
2538                         log.Warning,
2539                         "error hashing piece %v: %v", index, copyErr)
2540         }
2541         t.storageLock.RUnlock()
2542         t.cl.lock()
2543         defer t.cl.unlock()
2544         if correct {
2545                 for peer := range failedPeers {
2546                         t.cl.banPeerIP(peer.AsSlice())
2547                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2548                 }
2549                 t.dropBannedPeers()
2550                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2551                         t.smartBanCache.ForgetBlock(ri)
2552                 }
2553         }
2554         p.hashing = false
2555         t.pieceHashed(index, correct, copyErr)
2556         t.updatePiecePriority(index, "Torrent.pieceHasher")
2557         t.activePieceHashes--
2558         t.tryCreateMorePieceHashers()
2559 }
2560
2561 // Return the connections that touched a piece, and clear the entries while doing it.
2562 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2563         p := t.piece(pi)
2564         for c := range p.dirtiers {
2565                 delete(c.peerTouchedPieces, pi)
2566                 delete(p.dirtiers, c)
2567         }
2568 }
2569
2570 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2571         t.iterPeers(func(p *Peer) {
2572                 ret = append(ret, p)
2573         })
2574         return
2575 }
2576
2577 func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
2578         if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
2579                 t.queuePieceCheck(i)
2580         }
2581 }
2582
2583 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2584         piece := t.piece(pieceIndex)
2585         if piece.hash == nil && !piece.hashV2.Ok {
2586                 return
2587         }
2588         if piece.queuedForHash() {
2589                 return
2590         }
2591         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2592         t.publishPieceStateChange(pieceIndex)
2593         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2594         t.tryCreateMorePieceHashers()
2595 }
2596
2597 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2598 // before the Info is available.
2599 func (t *Torrent) VerifyData() {
2600         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2601                 t.Piece(i).VerifyData()
2602         }
2603 }
2604
2605 func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
2606         return len(t.halfOpen[addrStr]) != 0
2607 }
2608
2609 func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
2610         addrStr := x.String()
2611         for c := range t.conns {
2612                 ra := c.RemoteAddr
2613                 if ra.String() == addrStr {
2614                         return true
2615                 }
2616         }
2617         return false
2618 }
2619
2620 func (t *Torrent) getHalfOpenPath(
2621         addrStr string,
2622         attemptKey outgoingConnAttemptKey,
2623 ) nestedmaps.Path[*PeerInfo] {
2624         return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey)
2625 }
2626
2627 func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
2628         path := t.getHalfOpenPath(addrStr, attemptKey)
2629         if path.Exists() {
2630                 panic("should be unique")
2631         }
2632         path.Set(attemptKey)
2633         t.cl.numHalfOpen++
2634 }
2635
2636 // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
2637 // sure all the PeerInfo fields are being used.
2638 func initiateConn(
2639         opts outgoingConnOpts,
2640         ignoreLimits bool,
2641 ) {
2642         t := opts.t
2643         peer := opts.peerInfo
2644         if peer.Id == t.cl.peerID {
2645                 return
2646         }
2647         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2648                 return
2649         }
2650         addr := peer.Addr
2651         addrStr := addr.String()
2652         if !ignoreLimits {
2653                 if t.connectingToPeerAddr(addrStr) {
2654                         return
2655                 }
2656         }
2657         if t.hasPeerConnForAddr(addr) {
2658                 return
2659         }
2660         attemptKey := &peer
2661         t.addHalfOpen(addrStr, attemptKey)
2662         go t.cl.outgoingConnection(
2663                 opts,
2664                 attemptKey,
2665         )
2666 }
2667
2668 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2669 // quickly make one Client visible to the Torrent of another Client.
2670 func (t *Torrent) AddClientPeer(cl *Client) int {
2671         return t.AddPeers(func() (ps []PeerInfo) {
2672                 for _, la := range cl.ListenAddrs() {
2673                         ps = append(ps, PeerInfo{
2674                                 Addr:    la,
2675                                 Trusted: true,
2676                         })
2677                 }
2678                 return
2679         }())
2680 }
2681
2682 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2683 // connection.
2684 func (t *Torrent) allStats(f func(*ConnStats)) {
2685         f(&t.stats)
2686         f(&t.cl.connStats)
2687 }
2688
2689 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2690         return t.pieces[i].hashing
2691 }
2692
2693 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2694         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2695 }
2696
2697 func (t *Torrent) dialTimeout() time.Duration {
2698         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2699 }
2700
2701 func (t *Torrent) piece(i int) *Piece {
2702         return &t.pieces[i]
2703 }
2704
2705 func (t *Torrent) onWriteChunkErr(err error) {
2706         if t.userOnWriteChunkErr != nil {
2707                 go t.userOnWriteChunkErr(err)
2708                 return
2709         }
2710         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2711         t.disallowDataDownloadLocked()
2712 }
2713
2714 func (t *Torrent) DisallowDataDownload() {
2715         t.cl.lock()
2716         defer t.cl.unlock()
2717         t.disallowDataDownloadLocked()
2718 }
2719
2720 func (t *Torrent) disallowDataDownloadLocked() {
2721         t.dataDownloadDisallowed.Set()
2722         t.iterPeers(func(p *Peer) {
2723                 // Could check if peer request state is empty/not interested?
2724                 p.updateRequests("disallow data download")
2725                 p.cancelAllRequests()
2726         })
2727 }
2728
2729 func (t *Torrent) AllowDataDownload() {
2730         t.cl.lock()
2731         defer t.cl.unlock()
2732         t.dataDownloadDisallowed.Clear()
2733         t.iterPeers(func(p *Peer) {
2734                 p.updateRequests("allow data download")
2735         })
2736 }
2737
2738 // Enables uploading data, if it was disabled.
2739 func (t *Torrent) AllowDataUpload() {
2740         t.cl.lock()
2741         defer t.cl.unlock()
2742         t.dataUploadDisallowed = false
2743         t.iterPeers(func(p *Peer) {
2744                 p.updateRequests("allow data upload")
2745         })
2746 }
2747
2748 // Disables uploading data, if it was enabled.
2749 func (t *Torrent) DisallowDataUpload() {
2750         t.cl.lock()
2751         defer t.cl.unlock()
2752         t.dataUploadDisallowed = true
2753         for c := range t.conns {
2754                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2755                 c.updateRequests("disallow data upload")
2756         }
2757 }
2758
2759 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2760 // or if nil, a critical message is logged, and data download is disabled.
2761 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2762         t.cl.lock()
2763         defer t.cl.unlock()
2764         t.userOnWriteChunkErr = f
2765 }
2766
2767 func (t *Torrent) iterPeers(f func(p *Peer)) {
2768         for pc := range t.conns {
2769                 f(&pc.Peer)
2770         }
2771         for _, ws := range t.webSeeds {
2772                 f(ws)
2773         }
2774 }
2775
2776 func (t *Torrent) callbacks() *Callbacks {
2777         return &t.cl.config.Callbacks
2778 }
2779
2780 type AddWebSeedsOpt func(*webseed.Client)
2781
2782 // Sets the WebSeed trailing path escaper for a webseed.Client.
2783 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2784         return func(c *webseed.Client) {
2785                 c.PathEscaper = custom
2786         }
2787 }
2788
2789 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2790         t.cl.lock()
2791         defer t.cl.unlock()
2792         for _, u := range urls {
2793                 t.addWebSeed(u, opts...)
2794         }
2795 }
2796
2797 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2798         if t.cl.config.DisableWebseeds {
2799                 return
2800         }
2801         if _, ok := t.webSeeds[url]; ok {
2802                 return
2803         }
2804         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2805         // right away. This value should be some multiple of the number of connections to a host. I
2806         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2807         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2808         // "https://webtorrent.io/torrents/".
2809         const maxRequests = 16
2810         ws := webseedPeer{
2811                 peer: Peer{
2812                         t:                        t,
2813                         outgoing:                 true,
2814                         Network:                  "http",
2815                         reconciledHandshakeStats: true,
2816                         // This should affect how often we have to recompute requests for this peer. Note that
2817                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2818                         // requests mark more often, so recomputation is probably sooner than with regular peer
2819                         // conns. ~4x maxRequests would be about right.
2820                         PeerMaxRequests: 128,
2821                         // TODO: Set ban prefix?
2822                         RemoteAddr: remoteAddrFromUrl(url),
2823                         callbacks:  t.callbacks(),
2824                 },
2825                 client: webseed.Client{
2826                         HttpClient: t.cl.httpClient,
2827                         Url:        url,
2828                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2829                                 return &rateLimitedReader{
2830                                         l: t.cl.config.DownloadRateLimiter,
2831                                         r: r,
2832                                 }
2833                         },
2834                 },
2835                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2836         }
2837         ws.peer.initRequestState()
2838         for _, opt := range opts {
2839                 opt(&ws.client)
2840         }
2841         ws.peer.initUpdateRequestsTimer()
2842         ws.requesterCond.L = t.cl.locker()
2843         for i := 0; i < maxRequests; i += 1 {
2844                 go ws.requester(i)
2845         }
2846         for _, f := range t.callbacks().NewPeer {
2847                 f(&ws.peer)
2848         }
2849         ws.peer.logger = t.logger.WithContextValue(&ws)
2850         ws.peer.peerImpl = &ws
2851         if t.haveInfo() {
2852                 ws.onGotInfo(t.info)
2853         }
2854         t.webSeeds[url] = &ws.peer
2855 }
2856
2857 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2858         t.iterPeers(func(p1 *Peer) {
2859                 if p1 == p {
2860                         active = true
2861                 }
2862         })
2863         return
2864 }
2865
2866 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2867         index := t.pieceIndexOfRequestIndex(ri)
2868         return Request{
2869                 pp.Integer(index),
2870                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2871         }
2872 }
2873
2874 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2875         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2876 }
2877
2878 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2879         return RequestIndex(piece) * t.chunksPerRegularPiece()
2880 }
2881
2882 func (t *Torrent) updateComplete() {
2883         t.Complete.SetBool(t.haveAllPieces())
2884 }
2885
2886 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2887         p := t.requestingPeer(r)
2888         if p != nil {
2889                 p.cancel(r)
2890         }
2891         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2892         //delete(t.pendingRequests, r)
2893         if _, ok := t.requestState[r]; ok {
2894                 panic("expected request state to be gone")
2895         }
2896         return p
2897 }
2898
2899 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2900         return t.requestState[r].peer
2901 }
2902
2903 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2904         if t.connsWithAllPieces == nil {
2905                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2906         }
2907         t.connsWithAllPieces[p] = struct{}{}
2908 }
2909
2910 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2911         _, ok := t.connsWithAllPieces[p]
2912         delete(t.connsWithAllPieces, p)
2913         return ok
2914 }
2915
2916 func (t *Torrent) numActivePeers() int {
2917         return len(t.conns) + len(t.webSeeds)
2918 }
2919
2920 func (t *Torrent) hasStorageCap() bool {
2921         f := t.storage.Capacity
2922         if f == nil {
2923                 return false
2924         }
2925         _, ok := (*f)()
2926         return ok
2927 }
2928
2929 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2930         return pieceIndex(ri / t.chunksPerRegularPiece())
2931 }
2932
2933 func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
2934         reuseIter *typedRoaring.Iterator[RequestIndex],
2935         piece pieceIndex,
2936         f func(RequestIndex),
2937 ) {
2938         reuseIter.Initialize(&t.dirtyChunks)
2939         pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
2940         iterBitmapUnsetInRange(
2941                 reuseIter,
2942                 pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
2943                 f,
2944         )
2945 }
2946
2947 type requestState struct {
2948         peer *Peer
2949         when time.Time
2950 }
2951
2952 // Returns an error if a received chunk is out of bounds in someway.
2953 func (t *Torrent) checkValidReceiveChunk(r Request) error {
2954         if !t.haveInfo() {
2955                 return errors.New("torrent missing info")
2956         }
2957         if int(r.Index) >= t.numPieces() {
2958                 return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
2959         }
2960         pieceLength := t.pieceLength(pieceIndex(r.Index))
2961         if r.Begin >= pieceLength {
2962                 return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
2963         }
2964         // We could check chunk lengths here, but chunk request size is not changed often, and tricky
2965         // for peers to manipulate as they need to send potentially large buffers to begin with. There
2966         // should be considerable checks elsewhere for this case due to the network overhead. We should
2967         // catch most of the overflow manipulation stuff by checking index and begin above.
2968         return nil
2969 }
2970
2971 func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
2972         for pc := range t.conns {
2973                 dialAddr, err := pc.remoteDialAddrPort()
2974                 if err != nil {
2975                         continue
2976                 }
2977                 if dialAddr != target {
2978                         continue
2979                 }
2980                 ret = append(ret, pc)
2981         }
2982         return
2983 }
2984
2985 func wrapUtHolepunchMsgForPeerConn(
2986         recipient *PeerConn,
2987         msg utHolepunch.Msg,
2988 ) pp.Message {
2989         extendedPayload, err := msg.MarshalBinary()
2990         if err != nil {
2991                 panic(err)
2992         }
2993         return pp.Message{
2994                 Type:            pp.Extended,
2995                 ExtendedID:      MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
2996                 ExtendedPayload: extendedPayload,
2997         }
2998 }
2999
3000 func sendUtHolepunchMsg(
3001         pc *PeerConn,
3002         msgType utHolepunch.MsgType,
3003         addrPort netip.AddrPort,
3004         errCode utHolepunch.ErrCode,
3005 ) {
3006         holepunchMsg := utHolepunch.Msg{
3007                 MsgType:  msgType,
3008                 AddrPort: addrPort,
3009                 ErrCode:  errCode,
3010         }
3011         incHolepunchMessagesSent(holepunchMsg)
3012         ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg)
3013         pc.write(ppMsg)
3014 }
3015
3016 func incHolepunchMessages(msg utHolepunch.Msg, verb string) {
3017         torrent.Add(
3018                 fmt.Sprintf(
3019                         "holepunch %v %v messages %v",
3020                         msg.MsgType,
3021                         addrPortProtocolStr(msg.AddrPort),
3022                         verb,
3023                 ),
3024                 1,
3025         )
3026 }
3027
3028 func incHolepunchMessagesReceived(msg utHolepunch.Msg) {
3029         incHolepunchMessages(msg, "received")
3030 }
3031
3032 func incHolepunchMessagesSent(msg utHolepunch.Msg) {
3033         incHolepunchMessages(msg, "sent")
3034 }
3035
3036 func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
3037         incHolepunchMessagesReceived(msg)
3038         switch msg.MsgType {
3039         case utHolepunch.Rendezvous:
3040                 t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
3041                 sendMsg := sendUtHolepunchMsg
3042                 senderAddrPort, err := sender.remoteDialAddrPort()
3043                 if err != nil {
3044                         sender.logger.Levelf(
3045                                 log.Warning,
3046                                 "error getting ut_holepunch rendezvous sender's dial address: %v",
3047                                 err,
3048                         )
3049                         // There's no better error code. The sender's address itself is invalid. I don't see
3050                         // this error message being appropriate anywhere else anyway.
3051                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
3052                 }
3053                 targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
3054                 if len(targets) == 0 {
3055                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
3056                         return nil
3057                 }
3058                 for _, pc := range targets {
3059                         if !pc.supportsExtension(utHolepunch.ExtensionName) {
3060                                 sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
3061                                 continue
3062                         }
3063                         sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
3064                         sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
3065                 }
3066                 return nil
3067         case utHolepunch.Connect:
3068                 holepunchAddr := msg.AddrPort
3069                 t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender)
3070                 if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) {
3071                         setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr)
3072                         if g.MapContains(t.cl.accepted, holepunchAddr) {
3073                                 setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
3074                         }
3075                 }
3076                 opts := outgoingConnOpts{
3077                         peerInfo: PeerInfo{
3078                                 Addr:         msg.AddrPort,
3079                                 Source:       PeerSourceUtHolepunch,
3080                                 PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
3081                         },
3082                         t: t,
3083                         // Don't attempt to start our own rendezvous if we fail to connect.
3084                         skipHolepunchRendezvous:  true,
3085                         receivedHolepunchConnect: true,
3086                         // Assume that the other end initiated the rendezvous, and will use our preferred
3087                         // encryption. So we will act normally.
3088                         HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
3089                 }
3090                 initiateConn(opts, true)
3091                 return nil
3092         case utHolepunch.Error:
3093                 torrent.Add("holepunch error messages received", 1)
3094                 t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
3095                 return nil
3096         default:
3097                 return fmt.Errorf("unhandled msg type %v", msg.MsgType)
3098         }
3099 }
3100
3101 func addrPortProtocolStr(addrPort netip.AddrPort) string {
3102         addr := addrPort.Addr()
3103         switch {
3104         case addr.Is4():
3105                 return "ipv4"
3106         case addr.Is6():
3107                 return "ipv6"
3108         default:
3109                 panic(addrPort)
3110         }
3111 }
3112
3113 func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
3114         rzsSent := 0
3115         for pc := range t.conns {
3116                 if !pc.supportsExtension(utHolepunch.ExtensionName) {
3117                         continue
3118                 }
3119                 if pc.supportsExtension(pp.ExtensionNamePex) {
3120                         if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
3121                                 continue
3122                         }
3123                 }
3124                 t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
3125                 sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
3126                 rzsSent++
3127         }
3128         if rzsSent == 0 {
3129                 return errors.New("no eligible relays")
3130         }
3131         return nil
3132 }
3133
3134 func (t *Torrent) numHalfOpenAttempts() (num int) {
3135         for _, attempts := range t.halfOpen {
3136                 num += len(attempts)
3137         }
3138         return
3139 }
3140
3141 func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
3142         cl := t.cl
3143         cl.rLock()
3144         defer cl.rUnlock()
3145         return t.dialTimeout()
3146 }
3147
3148 func (t *Torrent) canonicalShortInfohash() *infohash.T {
3149         if t.infoHash.Ok {
3150                 return &t.infoHash.Value
3151         }
3152         return t.infoHashV2.UnwrapPtr().ToShort()
3153 }
3154
3155 func (t *Torrent) eachShortInfohash(each func(short [20]byte)) {
3156         if t.infoHash.Value == *t.infoHashV2.Value.ToShort() {
3157                 // This includes zero values, since they both should not be zero. Plus Option should not
3158                 // allow non-zero values for None.
3159                 panic("v1 and v2 info hashes should not be the same")
3160         }
3161         if t.infoHash.Ok {
3162                 each(t.infoHash.Value)
3163         }
3164         if t.infoHashV2.Ok {
3165                 v2Short := *t.infoHashV2.Value.ToShort()
3166                 each(v2Short)
3167         }
3168 }
3169
3170 func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File {
3171         for _, f := range *t.files {
3172                 if f.piecesRoot.Unwrap() == hash {
3173                         return f
3174                 }
3175         }
3176         return nil
3177 }