]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Some fixes to tests/issue-798 main
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "hash"
11         "io"
12         "math/rand"
13         "net/netip"
14         "net/url"
15         "sort"
16         "strings"
17         "text/tabwriter"
18         "time"
19         "unsafe"
20
21         "github.com/RoaringBitmap/roaring"
22         "github.com/anacrolix/chansync"
23         "github.com/anacrolix/chansync/events"
24         "github.com/anacrolix/dht/v2"
25         . "github.com/anacrolix/generics"
26         g "github.com/anacrolix/generics"
27         "github.com/anacrolix/log"
28         "github.com/anacrolix/missinggo/slices"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pubsub"
32         "github.com/anacrolix/multiless"
33         "github.com/anacrolix/sync"
34         "github.com/pion/datachannel"
35         "golang.org/x/exp/maps"
36         "golang.org/x/sync/errgroup"
37
38         "github.com/anacrolix/torrent/bencode"
39         "github.com/anacrolix/torrent/internal/check"
40         "github.com/anacrolix/torrent/internal/nestedmaps"
41         "github.com/anacrolix/torrent/merkle"
42         "github.com/anacrolix/torrent/metainfo"
43         pp "github.com/anacrolix/torrent/peer_protocol"
44         utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
45         request_strategy "github.com/anacrolix/torrent/request-strategy"
46         "github.com/anacrolix/torrent/storage"
47         "github.com/anacrolix/torrent/tracker"
48         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
49         "github.com/anacrolix/torrent/types/infohash"
50         infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2"
51         "github.com/anacrolix/torrent/webseed"
52         "github.com/anacrolix/torrent/webtorrent"
53 )
54
55 // Maintains state of torrent within a Client. Many methods should not be called before the info is
56 // available, see .Info and .GotInfo.
57 type Torrent struct {
58         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
59         // alignment. See #262.
60         stats  ConnStats
61         cl     *Client
62         logger log.Logger
63
64         networkingEnabled      chansync.Flag
65         dataDownloadDisallowed chansync.Flag
66         dataUploadDisallowed   bool
67         userOnWriteChunkErr    func(error)
68
69         closed  chansync.SetOnce
70         onClose []func()
71
72         infoHash   g.Option[metainfo.Hash]
73         infoHashV2 g.Option[infohash_v2.T]
74
75         pieces []Piece
76
77         // The order pieces are requested if there's no stronger reason like availability or priority.
78         pieceRequestOrder []int
79         // Values are the piece indices that changed.
80         pieceStateChanges pubsub.PubSub[PieceStateChange]
81         // The size of chunks to request from peers over the wire. This is
82         // normally 16KiB by convention these days.
83         chunkSize pp.Integer
84         chunkPool sync.Pool
85         // Total length of the torrent in bytes. Stored because it's not O(1) to
86         // get this from the info dict.
87         _length Option[int64]
88
89         // The storage to open when the info dict becomes available.
90         storageOpener *storage.Client
91         // Storage for torrent data.
92         storage *storage.Torrent
93         // Read-locked for using storage, and write-locked for Closing.
94         storageLock sync.RWMutex
95
96         // TODO: Only announce stuff is used?
97         metainfo metainfo.MetaInfo
98
99         // The info dict. nil if we don't have it (yet).
100         info  *metainfo.Info
101         files *[]*File
102
103         _chunksPerRegularPiece chunkIndexType
104
105         webSeeds map[string]*Peer
106         // Active peer connections, running message stream loops. TODO: Make this
107         // open (not-closed) connections only.
108         conns               map[*PeerConn]struct{}
109         maxEstablishedConns int
110         // Set of addrs to which we're attempting to connect. Connections are
111         // half-open until all handshakes are completed.
112         halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo
113
114         // Reserve of peers to connect to. A peer can be both here and in the
115         // active connections if were told about the peer after connecting with
116         // them. That encourages us to reconnect to peers that are well known in
117         // the swarm.
118         peers prioritizedPeers
119         // Whether we want to know more peers.
120         wantPeersEvent missinggo.Event
121         // An announcer for each tracker URL.
122         trackerAnnouncers map[torrentTrackerAnnouncerKey]torrentTrackerAnnouncer
123         // How many times we've initiated a DHT announce. TODO: Move into stats.
124         numDHTAnnounces int
125
126         // Name used if the info name isn't available. Should be cleared when the
127         // Info does become available.
128         nameMu      sync.RWMutex
129         displayName string
130
131         // The bencoded bytes of the info dict. This is actively manipulated if
132         // the info bytes aren't initially available, and we try to fetch them
133         // from peers.
134         metadataBytes []byte
135         // Each element corresponds to the 16KiB metadata pieces. If true, we have
136         // received that piece.
137         metadataCompletedChunks []bool
138         metadataChanged         sync.Cond
139
140         // Closed when .Info is obtained.
141         gotMetainfoC chan struct{}
142
143         readers                map[*reader]struct{}
144         _readerNowPieces       bitmap.Bitmap
145         _readerReadaheadPieces bitmap.Bitmap
146
147         // A cache of pieces we need to get. Calculated from various piece and file priorities and
148         // completion states elsewhere. Includes piece data and piece v2 hashes.
149         _pendingPieces roaring.Bitmap
150         // A cache of completed piece indices.
151         _completedPieces roaring.Bitmap
152         // Pieces that need to be hashed.
153         piecesQueuedForHash       bitmap.Bitmap
154         activePieceHashes         int
155         initialPieceCheckDisabled bool
156
157         connsWithAllPieces map[*Peer]struct{}
158
159         requestState map[RequestIndex]requestState
160         // Chunks we've written to since the corresponding piece was last checked.
161         dirtyChunks typedRoaring.Bitmap[RequestIndex]
162
163         pex pexState
164
165         // Is On when all pieces are complete.
166         Complete chansync.Flag
167
168         // Torrent sources in use keyed by the source string.
169         activeSources sync.Map
170         sourcesLogger log.Logger
171
172         smartBanCache smartBanCache
173
174         // Large allocations reused between request state updates.
175         requestPieceStates []g.Option[request_strategy.PieceRequestOrderState]
176         requestIndexes     []RequestIndex
177
178         disableTriggers bool
179 }
180
181 type torrentTrackerAnnouncerKey struct {
182         shortInfohash [20]byte
183         url           string
184 }
185
186 type outgoingConnAttemptKey = *PeerInfo
187
188 func (t *Torrent) length() int64 {
189         return t._length.Value
190 }
191
192 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
193         // This could be done with roaring.BitSliceIndexing.
194         t.iterPeers(func(peer *Peer) {
195                 if _, ok := t.connsWithAllPieces[peer]; ok {
196                         return
197                 }
198                 if peer.peerHasPiece(i) {
199                         count++
200                 }
201         })
202         return
203 }
204
205 func (t *Torrent) decPieceAvailability(i pieceIndex) {
206         if !t.haveInfo() {
207                 return
208         }
209         p := t.piece(i)
210         if p.relativeAvailability <= 0 {
211                 panic(p.relativeAvailability)
212         }
213         p.relativeAvailability--
214         t.updatePieceRequestOrderPiece(i)
215 }
216
217 func (t *Torrent) incPieceAvailability(i pieceIndex) {
218         // If we don't the info, this should be reconciled when we do.
219         if t.haveInfo() {
220                 p := t.piece(i)
221                 p.relativeAvailability++
222                 t.updatePieceRequestOrderPiece(i)
223         }
224 }
225
226 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
227         return t._readerNowPieces
228 }
229
230 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
231         return t._readerReadaheadPieces
232 }
233
234 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
235         return t.piece(i).ignoreForRequests()
236 }
237
238 // Returns a channel that is closed when the Torrent is closed.
239 func (t *Torrent) Closed() events.Done {
240         return t.closed.Done()
241 }
242
243 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
244 // pending, and half-open peers.
245 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
246         // Add pending peers to the list
247         t.peers.Each(func(peer PeerInfo) {
248                 ks = append(ks, peer)
249         })
250
251         // Add half-open peers to the list
252         for _, attempts := range t.halfOpen {
253                 for _, peer := range attempts {
254                         ks = append(ks, *peer)
255                 }
256         }
257
258         // Add active peers to the list
259         t.cl.rLock()
260         defer t.cl.rUnlock()
261         for conn := range t.conns {
262                 ks = append(ks, PeerInfo{
263                         Id:     conn.PeerID,
264                         Addr:   conn.RemoteAddr,
265                         Source: conn.Discovery,
266                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
267                         // > But if we're not connected to them with an encrypted connection, I couldn't say
268                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
269                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
270                         // > connection. It's probably easiest to do the latter for now.
271                         // https://github.com/anacrolix/torrent/pull/188
272                         SupportsEncryption: conn.headerEncrypted,
273                 })
274         }
275
276         return
277 }
278
279 func (t *Torrent) setChunkSize(size pp.Integer) {
280         t.chunkSize = size
281         t.chunkPool = sync.Pool{
282                 New: func() interface{} {
283                         b := make([]byte, size)
284                         return &b
285                 },
286         }
287 }
288
289 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
290         return t._completedPieces.Contains(bitmap.BitIndex(piece))
291 }
292
293 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
294         if t.storage == nil {
295                 return storage.Completion{Complete: false, Ok: true}
296         }
297         return t.pieces[piece].Storage().Completion()
298 }
299
300 // There's a connection to that address already.
301 func (t *Torrent) addrActive(addr string) bool {
302         if _, ok := t.halfOpen[addr]; ok {
303                 return true
304         }
305         for c := range t.conns {
306                 ra := c.RemoteAddr
307                 if ra.String() == addr {
308                         return true
309                 }
310         }
311         return false
312 }
313
314 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
315         return t.appendConns(ret, func(conn *PeerConn) bool {
316                 return !conn.closed.IsSet()
317         })
318 }
319
320 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
321         for c := range t.conns {
322                 if f(c) {
323                         ret = append(ret, c)
324                 }
325         }
326         return ret
327 }
328
329 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
330         cl := t.cl
331         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
332         if t.closed.IsSet() {
333                 return false
334         }
335         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
336                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
337                         torrent.Add("peers not added because of bad addr", 1)
338                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
339                         return false
340                 }
341         }
342         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
343                 torrent.Add("peers replaced", 1)
344                 if !replaced.equal(p) {
345                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
346                         added = true
347                 }
348         } else {
349                 added = true
350         }
351         t.openNewConns()
352         for t.peers.Len() > cl.config.TorrentPeersHighWater {
353                 _, ok := t.peers.DeleteMin()
354                 if ok {
355                         torrent.Add("excess reserve peers discarded", 1)
356                 }
357         }
358         return
359 }
360
361 func (t *Torrent) invalidateMetadata() {
362         for i := 0; i < len(t.metadataCompletedChunks); i++ {
363                 t.metadataCompletedChunks[i] = false
364         }
365         t.nameMu.Lock()
366         t.info = nil
367         t.nameMu.Unlock()
368 }
369
370 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
371         if t.haveInfo() {
372                 return
373         }
374         if index >= len(t.metadataCompletedChunks) {
375                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
376                 return
377         }
378         copy(t.metadataBytes[(1<<14)*index:], data)
379         t.metadataCompletedChunks[index] = true
380 }
381
382 func (t *Torrent) metadataPieceCount() int {
383         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
384 }
385
386 func (t *Torrent) haveMetadataPiece(piece int) bool {
387         if t.haveInfo() {
388                 return (1<<14)*piece < len(t.metadataBytes)
389         } else {
390                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
391         }
392 }
393
394 func (t *Torrent) metadataSize() int {
395         return len(t.metadataBytes)
396 }
397
398 func (t *Torrent) makePieces() {
399         t.pieces = make([]Piece, t.info.NumPieces())
400         for i := range t.pieces {
401                 piece := &t.pieces[i]
402                 piece.t = t
403                 piece.index = i
404                 piece.noPendingWrites.L = &piece.pendingWritesMutex
405                 if t.info.HasV1() {
406                         piece.hash = (*metainfo.Hash)(unsafe.Pointer(
407                                 unsafe.SliceData(t.info.Pieces[i*sha1.Size : (i+1)*sha1.Size])))
408                 }
409                 files := *t.files
410                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
411                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
412                 piece.files = files[beginFile:endFile]
413         }
414 }
415
416 func (t *Torrent) addPieceLayersLocked(layers map[string]string) (errs []error) {
417         if layers == nil {
418                 return
419         }
420 files:
421         for _, f := range *t.files {
422                 if f.numPieces() <= 1 {
423                         continue
424                 }
425                 if !f.piecesRoot.Ok {
426                         err := fmt.Errorf("no piece root set for file %v", f)
427                         errs = append(errs, err)
428                         continue files
429                 }
430                 compactLayer, ok := layers[string(f.piecesRoot.Value[:])]
431                 var hashes [][32]byte
432                 if ok {
433                         var err error
434                         hashes, err = merkle.CompactLayerToSliceHashes(compactLayer)
435                         if err != nil {
436                                 err = fmt.Errorf("bad piece layers for file %q: %w", f, err)
437                                 errs = append(errs, err)
438                                 continue files
439                         }
440                 } else {
441                         if f.length > t.info.PieceLength {
442                                 // BEP 52 is pretty strongly worded about this, even though we should be able to
443                                 // recover: If a v2 torrent is added by magnet link or infohash, we need to fetch
444                                 // piece layers ourselves anyway, and that's how we can recover from this.
445                                 t.logger.Levelf(log.Warning, "no piece layers for file %q", f)
446                         }
447                         continue files
448                 }
449                 if len(hashes) != f.numPieces() {
450                         errs = append(
451                                 errs,
452                                 fmt.Errorf("file %q: got %v hashes expected %v", f, len(hashes), f.numPieces()),
453                         )
454                         continue files
455                 }
456                 root := merkle.RootWithPadHash(hashes, metainfo.HashForPiecePad(t.info.PieceLength))
457                 if root != f.piecesRoot.Value {
458                         errs = append(errs, fmt.Errorf("%v: expected hash %x got %x", f, f.piecesRoot.Value, root))
459                         continue files
460                 }
461                 for i := range f.numPieces() {
462                         pi := f.BeginPieceIndex() + i
463                         p := t.piece(pi)
464                         p.setV2Hash(hashes[i])
465                 }
466         }
467         return
468 }
469
470 func (t *Torrent) AddPieceLayers(layers map[string]string) (errs []error) {
471         t.cl.lock()
472         defer t.cl.unlock()
473         return t.addPieceLayersLocked(layers)
474 }
475
476 // Returns the index of the first file containing the piece. files must be
477 // ordered by offset.
478 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
479         for i, f := range files {
480                 if f.offset+f.length > pieceOffset {
481                         return i
482                 }
483         }
484         return 0
485 }
486
487 // Returns the index after the last file containing the piece. files must be
488 // ordered by offset.
489 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
490         for i, f := range files {
491                 if f.offset >= pieceEndOffset {
492                         return i
493                 }
494         }
495         return len(files)
496 }
497
498 func (t *Torrent) cacheLength() {
499         var l int64
500         for _, f := range t.info.UpvertedFiles() {
501                 l += f.Length
502         }
503         t._length = Some(l)
504 }
505
506 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
507 // separately.
508 func (t *Torrent) setInfo(info *metainfo.Info) error {
509         if err := validateInfo(info); err != nil {
510                 return fmt.Errorf("bad info: %s", err)
511         }
512         if t.storageOpener != nil {
513                 var err error
514                 t.storage, err = t.storageOpener.OpenTorrent(info, *t.canonicalShortInfohash())
515                 if err != nil {
516                         return fmt.Errorf("error opening torrent storage: %s", err)
517                 }
518         }
519         t.nameMu.Lock()
520         t.info = info
521         t.nameMu.Unlock()
522         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
523         t.updateComplete()
524         t.displayName = "" // Save a few bytes lol.
525         t.initFiles()
526         t.cacheLength()
527         t.makePieces()
528         return nil
529 }
530
531 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
532         return request_strategy.PieceRequestOrderKey{
533                 InfoHash: *t.canonicalShortInfohash(),
534                 Index:    i,
535         }
536 }
537
538 // This seems to be all the follow-up tasks after info is set, that can't fail.
539 func (t *Torrent) onSetInfo() {
540         t.pieceRequestOrder = rand.Perm(t.numPieces())
541         t.initPieceRequestOrder()
542         MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
543         for i := range t.pieces {
544                 p := &t.pieces[i]
545                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
546                 // being dropped.
547                 if p.relativeAvailability != 0 {
548                         panic(p.relativeAvailability)
549                 }
550                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
551                 t.addRequestOrderPiece(i)
552                 t.updatePieceCompletion(i)
553                 t.queueInitialPieceCheck(i)
554         }
555         t.cl.event.Broadcast()
556         close(t.gotMetainfoC)
557         t.updateWantPeersEvent()
558         t.requestState = make(map[RequestIndex]requestState)
559         t.tryCreateMorePieceHashers()
560         t.iterPeers(func(p *Peer) {
561                 p.onGotInfo(t.info)
562                 p.updateRequests("onSetInfo")
563         })
564 }
565
566 // Checks the info bytes hash to expected values. Fills in any missing infohashes.
567 func (t *Torrent) hashInfoBytes(b []byte, info *metainfo.Info) error {
568         v1Hash := infohash.HashBytes(b)
569         v2Hash := infohash_v2.HashBytes(b)
570         cl := t.cl
571         if t.infoHash.Ok && !t.infoHashV2.Ok {
572                 if v1Hash == t.infoHash.Value {
573                         if info.HasV2() {
574                                 t.infoHashV2.Set(v2Hash)
575                                 cl.torrentsByShortHash[*v2Hash.ToShort()] = t
576                         }
577                 } else if *v2Hash.ToShort() == t.infoHash.Value {
578                         if !info.HasV2() {
579                                 return errors.New("invalid v2 info")
580                         }
581                         t.infoHashV2.Set(v2Hash)
582                         t.infoHash.SetNone()
583                         if info.HasV1() {
584                                 cl.torrentsByShortHash[v1Hash] = t
585                                 t.infoHash.Set(v1Hash)
586                         }
587                 }
588         } else if t.infoHash.Ok && t.infoHashV2.Ok {
589                 if v1Hash != t.infoHash.Value {
590                         return errors.New("incorrect v1 infohash")
591                 }
592                 if v2Hash != t.infoHashV2.Value {
593                         return errors.New("incorrect v2 infohash")
594                 }
595         } else if !t.infoHash.Ok && t.infoHashV2.Ok {
596                 if v2Hash != t.infoHashV2.Value {
597                         return errors.New("incorrect v2 infohash")
598                 }
599                 if info.HasV1() {
600                         t.infoHash.Set(v1Hash)
601                         cl.torrentsByShortHash[v1Hash] = t
602                 }
603         } else {
604                 panic("no expected infohashes")
605         }
606         return nil
607 }
608
609 // Called when metadata for a torrent becomes available.
610 func (t *Torrent) setInfoBytesLocked(b []byte) (err error) {
611         var info metainfo.Info
612         err = bencode.Unmarshal(b, &info)
613         if err != nil {
614                 err = fmt.Errorf("unmarshalling info bytes: %w", err)
615                 return
616         }
617         err = t.hashInfoBytes(b, &info)
618         if err != nil {
619                 return
620         }
621         t.metadataBytes = b
622         t.metadataCompletedChunks = nil
623         if t.info != nil {
624                 return nil
625         }
626         if err := t.setInfo(&info); err != nil {
627                 return err
628         }
629         t.onSetInfo()
630         return nil
631 }
632
633 func (t *Torrent) haveAllMetadataPieces() bool {
634         if t.haveInfo() {
635                 return true
636         }
637         if t.metadataCompletedChunks == nil {
638                 return false
639         }
640         for _, have := range t.metadataCompletedChunks {
641                 if !have {
642                         return false
643                 }
644         }
645         return true
646 }
647
648 // TODO: Propagate errors to disconnect peer.
649 func (t *Torrent) setMetadataSize(size int) (err error) {
650         if t.haveInfo() {
651                 // We already know the correct metadata size.
652                 return
653         }
654         if uint32(size) > maxMetadataSize {
655                 return log.WithLevel(log.Warning, errors.New("bad size"))
656         }
657         if len(t.metadataBytes) == size {
658                 return
659         }
660         t.metadataBytes = make([]byte, size)
661         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
662         t.metadataChanged.Broadcast()
663         for c := range t.conns {
664                 c.requestPendingMetadata()
665         }
666         return
667 }
668
669 // The current working name for the torrent. Either the name in the info dict,
670 // or a display name given such as by the dn value in a magnet link, or "".
671 func (t *Torrent) name() string {
672         t.nameMu.RLock()
673         defer t.nameMu.RUnlock()
674         if t.haveInfo() {
675                 return t.info.BestName()
676         }
677         if t.displayName != "" {
678                 return t.displayName
679         }
680         return "infohash:" + t.canonicalShortInfohash().HexString()
681 }
682
683 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
684         p := &t.pieces[index]
685         ret.Priority = p.effectivePriority()
686         ret.Completion = p.completion()
687         ret.QueuedForHash = p.queuedForHash()
688         ret.Hashing = p.hashing
689         ret.Checking = ret.QueuedForHash || ret.Hashing
690         ret.Marking = p.marking
691         if !ret.Complete && t.piecePartiallyDownloaded(index) {
692                 ret.Partial = true
693         }
694         if t.info.HasV2() && !p.hashV2.Ok && p.hasPieceLayer() {
695                 ret.MissingPieceLayerHash = true
696         }
697         return
698 }
699
700 func (t *Torrent) metadataPieceSize(piece int) int {
701         return metadataPieceSize(len(t.metadataBytes), piece)
702 }
703
704 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
705         return pp.Message{
706                 Type:       pp.Extended,
707                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
708                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
709                         Piece:     piece,
710                         TotalSize: len(t.metadataBytes),
711                         Type:      msgType,
712                 }), data...),
713         }
714 }
715
716 type pieceAvailabilityRun struct {
717         Count        pieceIndex
718         Availability int
719 }
720
721 func (me pieceAvailabilityRun) String() string {
722         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
723 }
724
725 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
726         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
727                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
728         })
729         for i := range t.pieces {
730                 rle.Append(t.pieces[i].availability(), 1)
731         }
732         rle.Flush()
733         return
734 }
735
736 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
737         freqs = make([]int, t.numActivePeers()+1)
738         for i := range t.pieces {
739                 freqs[t.piece(i).availability()]++
740         }
741         return
742 }
743
744 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
745         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
746                 ret = append(ret, PieceStateRun{
747                         PieceState: el.(PieceState),
748                         Length:     int(count),
749                 })
750         })
751         for index := range t.pieces {
752                 rle.Append(t.pieceState(index), 1)
753         }
754         rle.Flush()
755         return
756 }
757
758 // Produces a small string representing a PieceStateRun.
759 func (psr PieceStateRun) String() (ret string) {
760         ret = fmt.Sprintf("%d", psr.Length)
761         ret += func() string {
762                 switch psr.Priority {
763                 case PiecePriorityNext:
764                         return "N"
765                 case PiecePriorityNormal:
766                         return "."
767                 case PiecePriorityReadahead:
768                         return "R"
769                 case PiecePriorityNow:
770                         return "!"
771                 case PiecePriorityHigh:
772                         return "H"
773                 default:
774                         return ""
775                 }
776         }()
777         if psr.Hashing {
778                 ret += "H"
779         }
780         if psr.QueuedForHash {
781                 ret += "Q"
782         }
783         if psr.Marking {
784                 ret += "M"
785         }
786         if psr.Partial {
787                 ret += "P"
788         }
789         if psr.Complete {
790                 ret += "C"
791         }
792         if !psr.Ok {
793                 ret += "?"
794         }
795         if psr.MissingPieceLayerHash {
796                 ret += "h"
797         }
798         return
799 }
800
801 func (t *Torrent) writeStatus(w io.Writer) {
802         if t.infoHash.Ok {
803                 fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.Value.HexString())
804         }
805         if t.infoHashV2.Ok {
806                 fmt.Fprintf(w, "Infohash v2: %s\n", t.infoHashV2.Value.HexString())
807         }
808         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
809         if !t.haveInfo() {
810                 fmt.Fprintf(w, "Metadata have: ")
811                 for _, h := range t.metadataCompletedChunks {
812                         fmt.Fprintf(w, "%c", func() rune {
813                                 if h {
814                                         return 'H'
815                                 } else {
816                                         return '.'
817                                 }
818                         }())
819                 }
820                 fmt.Fprintln(w)
821         }
822         fmt.Fprintf(w, "Piece length: %s\n",
823                 func() string {
824                         if t.haveInfo() {
825                                 return fmt.Sprintf("%v (%v chunks)",
826                                         t.usualPieceSize(),
827                                         float64(t.usualPieceSize())/float64(t.chunkSize))
828                         } else {
829                                 return "no info"
830                         }
831                 }(),
832         )
833         if t.info != nil {
834                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
835                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
836                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
837                 // availability frequencies instead.
838                 if false {
839                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
840                                 for _, run := range t.pieceAvailabilityRuns() {
841                                         ret = append(ret, run.String())
842                                 }
843                                 return
844                         }(), " "))
845                 }
846                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
847                         func() (ret []string) {
848                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
849                                         if freq == 0 {
850                                                 continue
851                                         }
852                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
853                                 }
854                                 return
855                         }(),
856                         ", "))
857         }
858         fmt.Fprintf(w, "Reader Pieces:")
859         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
860                 fmt.Fprintf(w, " %d:%d", begin, end)
861                 return true
862         })
863         fmt.Fprintln(w)
864
865         fmt.Fprintf(w, "Enabled trackers:\n")
866         func() {
867                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
868                 fmt.Fprintf(tw, "    URL\tExtra\n")
869                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
870                         lu := l.URL()
871                         ru := r.URL()
872                         var luns, runs url.URL = *lu, *ru
873                         luns.Scheme = ""
874                         runs.Scheme = ""
875                         var ml missinggo.MultiLess
876                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
877                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
878                         return ml.Less()
879                 }).([]torrentTrackerAnnouncer) {
880                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
881                 }
882                 tw.Flush()
883         }()
884
885         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
886
887         dumpStats(w, t.statsLocked())
888
889         fmt.Fprintf(w, "webseeds:\n")
890         t.writePeerStatuses(w, maps.Values(t.webSeeds))
891
892         peerConns := maps.Keys(t.conns)
893         // Peers without priorities first, then those with. I'm undecided about how to order peers
894         // without priorities.
895         sort.Slice(peerConns, func(li, ri int) bool {
896                 l := peerConns[li]
897                 r := peerConns[ri]
898                 ml := multiless.New()
899                 lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
900                 rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
901                 ml = ml.Bool(lpp.Ok, rpp.Ok)
902                 ml = ml.Uint32(rpp.Value, lpp.Value)
903                 return ml.Less()
904         })
905
906         fmt.Fprintf(w, "%v peer conns:\n", len(peerConns))
907         t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
908                 return &pc.Peer
909         }))
910 }
911
912 func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
913         var buf bytes.Buffer
914         for _, c := range peers {
915                 fmt.Fprintf(w, "- ")
916                 buf.Reset()
917                 c.writeStatus(&buf)
918                 w.Write(bytes.TrimRight(
919                         bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n  ")),
920                         " "))
921         }
922 }
923
924 func (t *Torrent) haveInfo() bool {
925         return t.info != nil
926 }
927
928 // Returns a run-time generated MetaInfo that includes the info bytes and
929 // announce-list as currently known to the client.
930 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
931         return metainfo.MetaInfo{
932                 CreationDate: time.Now().Unix(),
933                 Comment:      "dynamic metainfo from client",
934                 CreatedBy:    "https://github.com/anacrolix/torrent",
935                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
936                 InfoBytes: func() []byte {
937                         if t.haveInfo() {
938                                 return t.metadataBytes
939                         } else {
940                                 return nil
941                         }
942                 }(),
943                 UrlList: func() []string {
944                         ret := make([]string, 0, len(t.webSeeds))
945                         for url := range t.webSeeds {
946                                 ret = append(ret, url)
947                         }
948                         return ret
949                 }(),
950                 PieceLayers: t.pieceLayers(),
951         }
952 }
953
954 // Returns a count of bytes that are not complete in storage, and not pending being written to
955 // storage. This value is from the perspective of the download manager, and may not agree with the
956 // actual state in storage. If you want read data synchronously you should use a Reader. See
957 // https://github.com/anacrolix/torrent/issues/828.
958 func (t *Torrent) BytesMissing() (n int64) {
959         t.cl.rLock()
960         n = t.bytesMissingLocked()
961         t.cl.rUnlock()
962         return
963 }
964
965 func (t *Torrent) bytesMissingLocked() int64 {
966         return t.bytesLeft()
967 }
968
969 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
970         roaring.Flip(b, 0, end).Iterate(cb)
971 }
972
973 func (t *Torrent) bytesLeft() (left int64) {
974         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
975                 p := t.piece(pieceIndex(x))
976                 left += int64(p.length() - p.numDirtyBytes())
977                 return true
978         })
979         return
980 }
981
982 // Bytes left to give in tracker announces.
983 func (t *Torrent) bytesLeftAnnounce() int64 {
984         if t.haveInfo() {
985                 return t.bytesLeft()
986         } else {
987                 return -1
988         }
989 }
990
991 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
992         if t.pieceComplete(piece) {
993                 return false
994         }
995         if t.pieceAllDirty(piece) {
996                 return false
997         }
998         return t.pieces[piece].hasDirtyChunks()
999 }
1000
1001 func (t *Torrent) usualPieceSize() int {
1002         return int(t.info.PieceLength)
1003 }
1004
1005 func (t *Torrent) numPieces() pieceIndex {
1006         return t.info.NumPieces()
1007 }
1008
1009 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
1010         return pieceIndex(t._completedPieces.GetCardinality())
1011 }
1012
1013 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
1014         if !t.closed.Set() {
1015                 err = errors.New("already closed")
1016                 return
1017         }
1018         for _, f := range t.onClose {
1019                 f()
1020         }
1021         if t.storage != nil {
1022                 wg.Add(1)
1023                 go func() {
1024                         defer wg.Done()
1025                         t.storageLock.Lock()
1026                         defer t.storageLock.Unlock()
1027                         if f := t.storage.Close; f != nil {
1028                                 err1 := f()
1029                                 if err1 != nil {
1030                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
1031                                 }
1032                         }
1033                 }()
1034         }
1035         t.iterPeers(func(p *Peer) {
1036                 p.close()
1037         })
1038         if t.storage != nil {
1039                 t.deletePieceRequestOrder()
1040         }
1041         t.assertAllPiecesRelativeAvailabilityZero()
1042         t.pex.Reset()
1043         t.cl.event.Broadcast()
1044         t.pieceStateChanges.Close()
1045         t.updateWantPeersEvent()
1046         return
1047 }
1048
1049 func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
1050         for i := range t.pieces {
1051                 p := t.piece(i)
1052                 if p.relativeAvailability != 0 {
1053                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
1054                 }
1055         }
1056 }
1057
1058 func (t *Torrent) requestOffset(r Request) int64 {
1059         return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
1060 }
1061
1062 // Return the request that would include the given offset into the torrent data. Returns !ok if
1063 // there is no such request.
1064 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
1065         return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
1066 }
1067
1068 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
1069         //defer perf.ScopeTimerErr(&err)()
1070         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
1071         if err == nil && n != len(data) {
1072                 err = io.ErrShortWrite
1073         }
1074         return err
1075 }
1076
1077 func (t *Torrent) bitfield() (bf []bool) {
1078         bf = make([]bool, t.numPieces())
1079         t._completedPieces.Iterate(func(piece uint32) (again bool) {
1080                 bf[piece] = true
1081                 return true
1082         })
1083         return
1084 }
1085
1086 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
1087         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
1088 }
1089
1090 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
1091         return t._chunksPerRegularPiece
1092 }
1093
1094 func (t *Torrent) numChunks() RequestIndex {
1095         if t.numPieces() == 0 {
1096                 return 0
1097         }
1098         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
1099 }
1100
1101 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
1102         t.dirtyChunks.RemoveRange(
1103                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
1104                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
1105 }
1106
1107 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
1108         if t.info.PieceLength == 0 {
1109                 // There will be no variance amongst pieces. Only pain.
1110                 return 0
1111         }
1112         if t.info.FilesArePieceAligned() {
1113                 p := t.piece(piece)
1114                 file := p.mustGetOnlyFile()
1115                 if piece == file.EndPieceIndex()-1 {
1116                         return pp.Integer(file.length - (p.torrentBeginOffset() - file.offset))
1117                 }
1118                 return pp.Integer(t.usualPieceSize())
1119         }
1120         if piece == t.numPieces()-1 {
1121                 ret := pp.Integer(t.length() % t.info.PieceLength)
1122                 if ret != 0 {
1123                         return ret
1124                 }
1125         }
1126         return pp.Integer(t.info.PieceLength)
1127 }
1128
1129 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
1130         return &blockCheckingWriter{
1131                 cache:        &t.smartBanCache,
1132                 requestIndex: t.pieceRequestIndexOffset(piece),
1133                 chunkSize:    t.chunkSize.Int(),
1134         }
1135 }
1136
1137 func (t *Torrent) hashPiece(piece pieceIndex) (
1138         correct bool,
1139         // These are peers that sent us blocks that differ from what we hash here.
1140         differingPeers map[bannableAddr]struct{},
1141         err error,
1142 ) {
1143         p := t.piece(piece)
1144         p.waitNoPendingWrites()
1145         storagePiece := p.Storage()
1146
1147         if p.hash != nil {
1148                 // Does the backend want to do its own hashing?
1149                 if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
1150                         var sum metainfo.Hash
1151                         // log.Printf("A piece decided to self-hash: %d", piece)
1152                         sum, err = i.SelfHash()
1153                         correct = sum == *p.hash
1154                         // Can't do smart banning without reading the piece. The smartBanCache is still cleared
1155                         // in pieceHasher regardless.
1156                         return
1157                 }
1158                 h := pieceHash.New()
1159                 differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
1160                 // For a hybrid torrent, we work with the v2 files, but if we use a v1 hash, we can assume that
1161                 // the pieces are padded with zeroes.
1162                 if t.info.FilesArePieceAligned() {
1163                         paddingLen := p.Info().V1Length() - p.Info().Length()
1164                         written, err := io.CopyN(h, zeroReader, paddingLen)
1165                         if written != paddingLen {
1166                                 panic(fmt.Sprintf(
1167                                         "piece %v: wrote %v bytes of padding, expected %v, error: %v",
1168                                         piece,
1169                                         written,
1170                                         paddingLen,
1171                                         err,
1172                                 ))
1173                         }
1174                 }
1175                 var sum [20]byte
1176                 sumExactly(sum[:], h.Sum)
1177                 correct = sum == *p.hash
1178         } else if p.hashV2.Ok {
1179                 h := merkle.NewHash()
1180                 differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
1181                 var sum [32]byte
1182                 // What about the final piece in a torrent? From BEP 52: "The layer is chosen so that one
1183                 // hash covers piece length bytes.". Note that if a piece doesn't have a hash in piece
1184                 // layers it's because it's not larger than the piece length.
1185                 sumExactly(sum[:], func(b []byte) []byte {
1186                         return h.SumMinLength(b, int(t.info.PieceLength))
1187                 })
1188                 correct = sum == p.hashV2.Value
1189         } else {
1190                 expected := p.mustGetOnlyFile().piecesRoot.Unwrap()
1191                 h := merkle.NewHash()
1192                 differingPeers, err = t.hashPieceWithSpecificHash(piece, h)
1193                 var sum [32]byte
1194                 // This is *not* padded to piece length.
1195                 sumExactly(sum[:], h.Sum)
1196                 correct = sum == expected
1197         }
1198         return
1199 }
1200
1201 func sumExactly(dst []byte, sum func(b []byte) []byte) {
1202         n := len(sum(dst[:0]))
1203         if n != len(dst) {
1204                 panic(n)
1205         }
1206 }
1207
1208 func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
1209         // These are peers that sent us blocks that differ from what we hash here.
1210         differingPeers map[bannableAddr]struct{},
1211         err error,
1212 ) {
1213         p := t.piece(piece)
1214         storagePiece := p.Storage()
1215
1216         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
1217         multiWriter := io.MultiWriter(h, smartBanWriter)
1218         {
1219                 var written int64
1220                 written, err = storagePiece.WriteTo(multiWriter)
1221                 if err == nil && written != int64(p.length()) {
1222                         err = fmt.Errorf("wrote %v bytes from storage, piece has length %v", written, p.length())
1223                         // Skip smart banning since we can't blame them for storage issues. A short write would
1224                         // ban peers for all recorded blocks that weren't just written.
1225                         return
1226                 }
1227         }
1228         // Flush before writing padding, since we would not have recorded the padding blocks.
1229         smartBanWriter.Flush()
1230         differingPeers = smartBanWriter.badPeers
1231         return
1232 }
1233
1234 func (t *Torrent) haveAnyPieces() bool {
1235         return !t._completedPieces.IsEmpty()
1236 }
1237
1238 func (t *Torrent) haveAllPieces() bool {
1239         if !t.haveInfo() {
1240                 return false
1241         }
1242         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1243 }
1244
1245 func (t *Torrent) havePiece(index pieceIndex) bool {
1246         return t.haveInfo() && t.pieceComplete(index)
1247 }
1248
1249 func (t *Torrent) maybeDropMutuallyCompletePeer(
1250         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1251         // okay?
1252         p *PeerConn,
1253 ) {
1254         if !t.cl.config.DropMutuallyCompletePeers {
1255                 return
1256         }
1257         if !t.haveAllPieces() {
1258                 return
1259         }
1260         if all, known := p.peerHasAllPieces(); !(known && all) {
1261                 return
1262         }
1263         if p.useful() {
1264                 return
1265         }
1266         p.logger.Levelf(log.Debug, "is mutually complete; dropping")
1267         p.drop()
1268 }
1269
1270 func (t *Torrent) haveChunk(r Request) (ret bool) {
1271         // defer func() {
1272         //      log.Println("have chunk", r, ret)
1273         // }()
1274         if !t.haveInfo() {
1275                 return false
1276         }
1277         if t.pieceComplete(pieceIndex(r.Index)) {
1278                 return true
1279         }
1280         p := &t.pieces[r.Index]
1281         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1282 }
1283
1284 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1285         return chunkIndexType(cs.Begin / chunkSize)
1286 }
1287
1288 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1289         return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
1290 }
1291
1292 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1293 // conns (which is a map).
1294 var peerConnSlices sync.Pool
1295
1296 func getPeerConnSlice(cap int) []*PeerConn {
1297         getInterface := peerConnSlices.Get()
1298         if getInterface == nil {
1299                 return make([]*PeerConn, 0, cap)
1300         } else {
1301                 return getInterface.([]*PeerConn)[:0]
1302         }
1303 }
1304
1305 // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
1306 // this is a frequent occurrence.
1307 func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
1308         sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
1309         f(sl)
1310         peerConnSlices.Put(sl)
1311 }
1312
1313 func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
1314         wcs := worseConnSlice{conns: sl}
1315         wcs.initKeys(opts)
1316         heap.Init(&wcs)
1317         for wcs.Len() != 0 {
1318                 c := heap.Pop(&wcs).(*PeerConn)
1319                 if opts.incomingIsBad && !c.outgoing {
1320                         return c
1321                 }
1322                 if opts.outgoingIsBad && c.outgoing {
1323                         return c
1324                 }
1325                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1326                         return c
1327                 }
1328                 // If the connection is in the worst half of the established
1329                 // connection quota and is older than a minute.
1330                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1331                         // Give connections 1 minute to prove themselves.
1332                         if time.Since(c.completedHandshake) > time.Minute {
1333                                 return c
1334                         }
1335                 }
1336         }
1337         return nil
1338 }
1339
1340 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1341 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1342 // established connections for more than a minute. This is O(n log n). If there was a way to not
1343 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1344 func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
1345         t.withUnclosedConns(func(ucs []*PeerConn) {
1346                 ret = t.worstBadConnFromSlice(opts, ucs)
1347         })
1348         return
1349 }
1350
1351 type PieceStateChange struct {
1352         Index int
1353         PieceState
1354 }
1355
1356 func (t *Torrent) publishPieceStateChange(piece pieceIndex) {
1357         t.cl._mu.Defer(func() {
1358                 cur := t.pieceState(piece)
1359                 p := &t.pieces[piece]
1360                 if cur != p.publicPieceState {
1361                         p.publicPieceState = cur
1362                         t.pieceStateChanges.Publish(PieceStateChange{
1363                                 int(piece),
1364                                 cur,
1365                         })
1366                 }
1367         })
1368 }
1369
1370 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1371         if t.pieceComplete(piece) {
1372                 return 0
1373         }
1374         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1375 }
1376
1377 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1378         return t.pieces[piece].allChunksDirty()
1379 }
1380
1381 func (t *Torrent) readersChanged() {
1382         t.updateReaderPieces()
1383         t.updateAllPiecePriorities("Torrent.readersChanged")
1384 }
1385
1386 func (t *Torrent) updateReaderPieces() {
1387         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1388 }
1389
1390 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1391         if from == to {
1392                 return
1393         }
1394         t.updateReaderPieces()
1395         // Order the ranges, high and low.
1396         l, h := from, to
1397         if l.begin > h.begin {
1398                 l, h = h, l
1399         }
1400         if l.end < h.begin {
1401                 // Two distinct ranges.
1402                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1403                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1404         } else {
1405                 // Ranges overlap.
1406                 end := l.end
1407                 if h.end > end {
1408                         end = h.end
1409                 }
1410                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1411         }
1412 }
1413
1414 func (t *Torrent) maybeNewConns() {
1415         // Tickle the accept routine.
1416         t.cl.event.Broadcast()
1417         t.openNewConns()
1418 }
1419
1420 func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string) {
1421         if t._pendingPieces.Contains(uint32(piece)) {
1422                 t.iterPeers(func(c *Peer) {
1423                         // if c.requestState.Interested {
1424                         //      return
1425                         // }
1426                         if !c.isLowOnRequests() {
1427                                 return
1428                         }
1429                         if !c.peerHasPiece(piece) {
1430                                 return
1431                         }
1432                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1433                                 return
1434                         }
1435                         c.updateRequests(reason)
1436                 })
1437         }
1438         t.maybeNewConns()
1439         t.publishPieceStateChange(piece)
1440 }
1441
1442 func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) {
1443         if !t.closed.IsSet() {
1444                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1445                 // request order.
1446                 t.updatePieceRequestOrderPiece(piece)
1447         }
1448         p := t.piece(piece)
1449         newPrio := p.effectivePriority()
1450         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1451         if newPrio == PiecePriorityNone && p.haveHash() {
1452                 return t._pendingPieces.CheckedRemove(uint32(piece))
1453         } else {
1454                 return t._pendingPieces.CheckedAdd(uint32(piece))
1455         }
1456 }
1457
1458 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1459         if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
1460                 t.onPiecePendingTriggers(piece, reason)
1461         }
1462         t.updatePieceRequestOrderPiece(piece)
1463 }
1464
1465 func (t *Torrent) updateAllPiecePriorities(reason string) {
1466         t.updatePiecePriorities(0, t.numPieces(), reason)
1467 }
1468
1469 // Update all piece priorities in one hit. This function should have the same
1470 // output as updatePiecePriority, but across all pieces.
1471 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1472         for i := begin; i < end; i++ {
1473                 t.updatePiecePriority(i, reason)
1474         }
1475 }
1476
1477 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1478 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1479         if off >= t.length() {
1480                 return
1481         }
1482         if off < 0 {
1483                 size += off
1484                 off = 0
1485         }
1486         if size <= 0 {
1487                 return
1488         }
1489         begin = pieceIndex(off / t.info.PieceLength)
1490         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1491         if end > pieceIndex(t.info.NumPieces()) {
1492                 end = pieceIndex(t.info.NumPieces())
1493         }
1494         return
1495 }
1496
1497 // Returns true if all iterations complete without breaking. Returns the read regions for all
1498 // readers. The reader regions should not be merged as some callers depend on this method to
1499 // enumerate readers.
1500 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1501         for r := range t.readers {
1502                 p := r.pieces
1503                 if p.begin >= p.end {
1504                         continue
1505                 }
1506                 if !f(p.begin, p.end) {
1507                         return false
1508                 }
1509         }
1510         return true
1511 }
1512
1513 func (t *Torrent) pendRequest(req RequestIndex) {
1514         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1515 }
1516
1517 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1518         t.cl.event.Broadcast()
1519         if t.pieceComplete(piece) {
1520                 t.onPieceCompleted(piece)
1521         } else {
1522                 t.onIncompletePiece(piece)
1523         }
1524         t.updatePiecePriority(piece, reason)
1525 }
1526
1527 func (t *Torrent) numReceivedConns() (ret int) {
1528         for c := range t.conns {
1529                 if c.Discovery == PeerSourceIncoming {
1530                         ret++
1531                 }
1532         }
1533         return
1534 }
1535
1536 func (t *Torrent) numOutgoingConns() (ret int) {
1537         for c := range t.conns {
1538                 if c.outgoing {
1539                         ret++
1540                 }
1541         }
1542         return
1543 }
1544
1545 func (t *Torrent) maxHalfOpen() int {
1546         // Note that if we somehow exceed the maximum established conns, we want
1547         // the negative value to have an effect.
1548         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1549         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1550         // We want to allow some experimentation with new peers, and to try to
1551         // upset an oversupply of received connections.
1552         return int(min(
1553                 max(5, extraIncoming)+establishedHeadroom,
1554                 int64(t.cl.config.HalfOpenConnsPerTorrent),
1555         ))
1556 }
1557
1558 func (t *Torrent) openNewConns() (initiated int) {
1559         defer t.updateWantPeersEvent()
1560         for t.peers.Len() != 0 {
1561                 if !t.wantOutgoingConns() {
1562                         return
1563                 }
1564                 if len(t.halfOpen) >= t.maxHalfOpen() {
1565                         return
1566                 }
1567                 if len(t.cl.dialers) == 0 {
1568                         return
1569                 }
1570                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1571                         return
1572                 }
1573                 p := t.peers.PopMax()
1574                 opts := outgoingConnOpts{
1575                         peerInfo:                 p,
1576                         t:                        t,
1577                         requireRendezvous:        false,
1578                         skipHolepunchRendezvous:  false,
1579                         receivedHolepunchConnect: false,
1580                         HeaderObfuscationPolicy:  t.cl.config.HeaderObfuscationPolicy,
1581                 }
1582                 initiateConn(opts, false)
1583                 initiated++
1584         }
1585         return
1586 }
1587
1588 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1589         p := t.piece(piece)
1590         uncached := t.pieceCompleteUncached(piece)
1591         cached := p.completion()
1592         changed := cached != uncached
1593         complete := uncached.Complete
1594         p.storageCompletionOk = uncached.Ok
1595         x := uint32(piece)
1596         if complete {
1597                 t._completedPieces.Add(x)
1598                 t.openNewConns()
1599         } else {
1600                 t._completedPieces.Remove(x)
1601         }
1602         p.t.updatePieceRequestOrderPiece(piece)
1603         t.updateComplete()
1604         if complete && len(p.dirtiers) != 0 {
1605                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1606         }
1607         if changed {
1608                 //slog.Debug(
1609                 //      "piece completion changed",
1610                 //      slog.Int("piece", piece),
1611                 //      slog.Any("from", cached),
1612                 //      slog.Any("to", uncached))
1613                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1614         }
1615         return changed
1616 }
1617
1618 // Non-blocking read. Client lock is not required.
1619 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1620         for len(b) != 0 {
1621                 p := &t.pieces[off/t.info.PieceLength]
1622                 p.waitNoPendingWrites()
1623                 var n1 int
1624                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1625                 if n1 == 0 {
1626                         break
1627                 }
1628                 off += int64(n1)
1629                 n += n1
1630                 b = b[n1:]
1631         }
1632         return
1633 }
1634
1635 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1636 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1637 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1638 func (t *Torrent) maybeCompleteMetadata() error {
1639         if t.haveInfo() {
1640                 // Nothing to do.
1641                 return nil
1642         }
1643         if !t.haveAllMetadataPieces() {
1644                 // Don't have enough metadata pieces.
1645                 return nil
1646         }
1647         err := t.setInfoBytesLocked(t.metadataBytes)
1648         if err != nil {
1649                 t.invalidateMetadata()
1650                 return fmt.Errorf("error setting info bytes: %s", err)
1651         }
1652         if t.cl.config.Debug {
1653                 t.logger.Printf("%s: got metadata from peers", t)
1654         }
1655         return nil
1656 }
1657
1658 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1659         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1660                 if end > begin {
1661                         now.Add(bitmap.BitIndex(begin))
1662                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1663                 }
1664                 return true
1665         })
1666         return
1667 }
1668
1669 func (t *Torrent) needData() bool {
1670         if t.closed.IsSet() {
1671                 return false
1672         }
1673         if !t.haveInfo() {
1674                 return true
1675         }
1676         return !t._pendingPieces.IsEmpty()
1677 }
1678
1679 func appendMissingStrings(old, new []string) (ret []string) {
1680         ret = old
1681 new:
1682         for _, n := range new {
1683                 for _, o := range old {
1684                         if o == n {
1685                                 continue new
1686                         }
1687                 }
1688                 ret = append(ret, n)
1689         }
1690         return
1691 }
1692
1693 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1694         ret = existing
1695         for minNumTiers > len(ret) {
1696                 ret = append(ret, nil)
1697         }
1698         return
1699 }
1700
1701 func (t *Torrent) addTrackers(announceList [][]string) {
1702         fullAnnounceList := &t.metainfo.AnnounceList
1703         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1704         for tierIndex, trackerURLs := range announceList {
1705                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1706         }
1707         t.startMissingTrackerScrapers()
1708         t.updateWantPeersEvent()
1709 }
1710
1711 // Don't call this before the info is available.
1712 func (t *Torrent) bytesCompleted() int64 {
1713         if !t.haveInfo() {
1714                 return 0
1715         }
1716         return t.length() - t.bytesLeft()
1717 }
1718
1719 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1720         t.cl.lock()
1721         defer t.cl.unlock()
1722         return t.setInfoBytesLocked(b)
1723 }
1724
1725 // Returns true if connection is removed from torrent.Conns.
1726 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1727         if !c.closed.IsSet() {
1728                 panic("connection is not closed")
1729                 // There are behaviours prevented by the closed state that will fail
1730                 // if the connection has been deleted.
1731         }
1732         _, ret = t.conns[c]
1733         delete(t.conns, c)
1734         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1735         // the drop event against the PexConnState instead.
1736         if ret {
1737                 if !t.cl.config.DisablePEX {
1738                         t.pex.Drop(c)
1739                 }
1740         }
1741         torrent.Add("deleted connections", 1)
1742         c.deleteAllRequests("Torrent.deletePeerConn")
1743         t.assertPendingRequests()
1744         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1745                 panic(t.connsWithAllPieces)
1746         }
1747         return
1748 }
1749
1750 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1751         if t.deleteConnWithAllPieces(p) {
1752                 return
1753         }
1754         if !t.haveInfo() {
1755                 return
1756         }
1757         p.peerPieces().Iterate(func(i uint32) bool {
1758                 p.t.decPieceAvailability(pieceIndex(i))
1759                 return true
1760         })
1761 }
1762
1763 func (t *Torrent) assertPendingRequests() {
1764         if !check.Enabled {
1765                 return
1766         }
1767         // var actual pendingRequests
1768         // if t.haveInfo() {
1769         //      actual.m = make([]int, t.numChunks())
1770         // }
1771         // t.iterPeers(func(p *Peer) {
1772         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1773         //              actual.Inc(x)
1774         //              return true
1775         //      })
1776         // })
1777         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1778         // if diff != "" {
1779         //      panic(diff)
1780         // }
1781 }
1782
1783 func (t *Torrent) dropConnection(c *PeerConn) {
1784         t.cl.event.Broadcast()
1785         c.close()
1786         if t.deletePeerConn(c) {
1787                 t.openNewConns()
1788         }
1789 }
1790
1791 // Peers as in contact information for dialing out.
1792 func (t *Torrent) wantPeers() bool {
1793         if t.closed.IsSet() {
1794                 return false
1795         }
1796         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1797                 return false
1798         }
1799         return t.wantOutgoingConns()
1800 }
1801
1802 func (t *Torrent) updateWantPeersEvent() {
1803         if t.wantPeers() {
1804                 t.wantPeersEvent.Set()
1805         } else {
1806                 t.wantPeersEvent.Clear()
1807         }
1808 }
1809
1810 // Returns whether the client should make effort to seed the torrent.
1811 func (t *Torrent) seeding() bool {
1812         cl := t.cl
1813         if t.closed.IsSet() {
1814                 return false
1815         }
1816         if t.dataUploadDisallowed {
1817                 return false
1818         }
1819         if cl.config.NoUpload {
1820                 return false
1821         }
1822         if !cl.config.Seed {
1823                 return false
1824         }
1825         if cl.config.DisableAggressiveUpload && t.needData() {
1826                 return false
1827         }
1828         return true
1829 }
1830
1831 func (t *Torrent) onWebRtcConn(
1832         c datachannel.ReadWriteCloser,
1833         dcc webtorrent.DataChannelContext,
1834 ) {
1835         defer c.Close()
1836         netConn := webrtcNetConn{
1837                 ReadWriteCloser:    c,
1838                 DataChannelContext: dcc,
1839         }
1840         peerRemoteAddr := netConn.RemoteAddr()
1841         //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
1842         if t.cl.badPeerAddr(peerRemoteAddr) {
1843                 return
1844         }
1845         localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
1846         pc, err := t.cl.initiateProtocolHandshakes(
1847                 context.Background(),
1848                 netConn,
1849                 t,
1850                 false,
1851                 newConnectionOpts{
1852                         outgoing:        dcc.LocalOffered,
1853                         remoteAddr:      peerRemoteAddr,
1854                         localPublicAddr: localAddrIpPort,
1855                         network:         webrtcNetwork,
1856                         connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1857                 },
1858         )
1859         if err != nil {
1860                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1861                 return
1862         }
1863         if dcc.LocalOffered {
1864                 pc.Discovery = PeerSourceTracker
1865         } else {
1866                 pc.Discovery = PeerSourceIncoming
1867         }
1868         pc.conn.SetWriteDeadline(time.Time{})
1869         t.cl.lock()
1870         defer t.cl.unlock()
1871         err = t.runHandshookConn(pc)
1872         if err != nil {
1873                 t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
1874         }
1875 }
1876
1877 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1878         err := t.runHandshookConn(pc)
1879         if err != nil || logAll {
1880                 t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
1881         }
1882 }
1883
1884 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1885         t.logRunHandshookConn(pc, false, log.Debug)
1886 }
1887
1888 func (t *Torrent) startWebsocketAnnouncer(u url.URL, shortInfohash [20]byte) torrentTrackerAnnouncer {
1889         wtc, release := t.cl.websocketTrackers.Get(u.String(), shortInfohash)
1890         // This needs to run before the Torrent is dropped from the Client, to prevent a new
1891         // webtorrent.TrackerClient for the same info hash before the old one is cleaned up.
1892         t.onClose = append(t.onClose, release)
1893         wst := websocketTrackerStatus{u, wtc}
1894         go func() {
1895                 err := wtc.Announce(tracker.Started, shortInfohash)
1896                 if err != nil {
1897                         t.logger.WithDefaultLevel(log.Warning).Printf(
1898                                 "error in initial announce to %q: %v",
1899                                 u.String(), err,
1900                         )
1901                 }
1902         }()
1903         return wst
1904 }
1905
1906 func (t *Torrent) startScrapingTracker(_url string) {
1907         if _url == "" {
1908                 return
1909         }
1910         u, err := url.Parse(_url)
1911         if err != nil {
1912                 // URLs with a leading '*' appear to be a uTorrent convention to disable trackers.
1913                 if _url[0] != '*' {
1914                         t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err)
1915                 }
1916                 return
1917         }
1918         if u.Scheme == "udp" {
1919                 u.Scheme = "udp4"
1920                 t.startScrapingTracker(u.String())
1921                 u.Scheme = "udp6"
1922                 t.startScrapingTracker(u.String())
1923                 return
1924         }
1925         if t.infoHash.Ok {
1926                 t.startScrapingTrackerWithInfohash(u, _url, t.infoHash.Value)
1927         }
1928         if t.infoHashV2.Ok {
1929                 t.startScrapingTrackerWithInfohash(u, _url, *t.infoHashV2.Value.ToShort())
1930         }
1931 }
1932
1933 func (t *Torrent) startScrapingTrackerWithInfohash(u *url.URL, urlStr string, shortInfohash [20]byte) {
1934         announcerKey := torrentTrackerAnnouncerKey{
1935                 shortInfohash: shortInfohash,
1936                 url:           urlStr,
1937         }
1938         if _, ok := t.trackerAnnouncers[announcerKey]; ok {
1939                 return
1940         }
1941         sl := func() torrentTrackerAnnouncer {
1942                 switch u.Scheme {
1943                 case "ws", "wss":
1944                         if t.cl.config.DisableWebtorrent {
1945                                 return nil
1946                         }
1947                         return t.startWebsocketAnnouncer(*u, shortInfohash)
1948                 case "udp4":
1949                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1950                                 return nil
1951                         }
1952                 case "udp6":
1953                         if t.cl.config.DisableIPv6 {
1954                                 return nil
1955                         }
1956                 }
1957                 newAnnouncer := &trackerScraper{
1958                         shortInfohash:   shortInfohash,
1959                         u:               *u,
1960                         t:               t,
1961                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1962                 }
1963                 go newAnnouncer.Run()
1964                 return newAnnouncer
1965         }()
1966         if sl == nil {
1967                 return
1968         }
1969         g.MakeMapIfNil(&t.trackerAnnouncers)
1970         if g.MapInsert(t.trackerAnnouncers, announcerKey, sl).Ok {
1971                 panic("tracker announcer already exists")
1972         }
1973 }
1974
1975 // Adds and starts tracker scrapers for tracker URLs that aren't already
1976 // running.
1977 func (t *Torrent) startMissingTrackerScrapers() {
1978         if t.cl.config.DisableTrackers {
1979                 return
1980         }
1981         t.startScrapingTracker(t.metainfo.Announce)
1982         for _, tier := range t.metainfo.AnnounceList {
1983                 for _, url := range tier {
1984                         t.startScrapingTracker(url)
1985                 }
1986         }
1987 }
1988
1989 // Returns an AnnounceRequest with fields filled out to defaults and current
1990 // values.
1991 func (t *Torrent) announceRequest(
1992         event tracker.AnnounceEvent,
1993         shortInfohash [20]byte,
1994 ) tracker.AnnounceRequest {
1995         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1996         // dependent on the network in use.
1997         return tracker.AnnounceRequest{
1998                 Event: event,
1999                 NumWant: func() int32 {
2000                         if t.wantPeers() && len(t.cl.dialers) > 0 {
2001                                 // Windozer has UDP packet limit. See:
2002                                 // https://github.com/anacrolix/torrent/issues/764
2003                                 return 200
2004                         } else {
2005                                 return 0
2006                         }
2007                 }(),
2008                 Port:     uint16(t.cl.incomingPeerPort()),
2009                 PeerId:   t.cl.peerID,
2010                 InfoHash: shortInfohash,
2011                 Key:      t.cl.announceKey(),
2012
2013                 // The following are vaguely described in BEP 3.
2014
2015                 Left:     t.bytesLeftAnnounce(),
2016                 Uploaded: t.stats.BytesWrittenData.Int64(),
2017                 // There's no mention of wasted or unwanted download in the BEP.
2018                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
2019         }
2020 }
2021
2022 // Adds peers revealed in an announce until the announce ends, or we have
2023 // enough peers.
2024 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
2025         cl := t.cl
2026         for v := range pvs {
2027                 cl.lock()
2028                 added := 0
2029                 for _, cp := range v.Peers {
2030                         if cp.Port == 0 {
2031                                 // Can't do anything with this.
2032                                 continue
2033                         }
2034                         if t.addPeer(PeerInfo{
2035                                 Addr:   ipPortAddr{cp.IP, cp.Port},
2036                                 Source: PeerSourceDhtGetPeers,
2037                         }) {
2038                                 added++
2039                         }
2040                 }
2041                 cl.unlock()
2042                 // if added != 0 {
2043                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
2044                 // }
2045         }
2046 }
2047
2048 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
2049 // announce ends. stop will force the announce to end. This interface is really old-school, and
2050 // calls a private one that is much more modern. Both v1 and v2 info hashes are announced if they
2051 // exist.
2052 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
2053         var ihs [][20]byte
2054         t.cl.lock()
2055         t.eachShortInfohash(func(short [20]byte) {
2056                 ihs = append(ihs, short)
2057         })
2058         t.cl.unlock()
2059         ctx, stop := context.WithCancel(context.Background())
2060         eg, ctx := errgroup.WithContext(ctx)
2061         for _, ih := range ihs {
2062                 var ann DhtAnnounce
2063                 ann, err = s.Announce(ih, t.cl.incomingPeerPort(), true)
2064                 if err != nil {
2065                         stop()
2066                         return
2067                 }
2068                 eg.Go(func() error {
2069                         return t.dhtAnnounceConsumer(ctx, ann)
2070                 })
2071         }
2072         _done := make(chan struct{})
2073         done = _done
2074         go func() {
2075                 defer stop()
2076                 defer close(_done)
2077                 eg.Wait()
2078         }()
2079         return
2080 }
2081
2082 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
2083 // announce ends. stop will force the announce to end.
2084 func (t *Torrent) dhtAnnounceConsumer(
2085         ctx context.Context,
2086         ps DhtAnnounce,
2087 ) (
2088         err error,
2089 ) {
2090         defer ps.Close()
2091         done := make(chan struct{})
2092         go func() {
2093                 defer close(done)
2094                 t.consumeDhtAnnouncePeers(ps.Peers())
2095         }()
2096         select {
2097         case <-ctx.Done():
2098                 return context.Cause(ctx)
2099         case <-done:
2100                 return nil
2101         }
2102 }
2103
2104 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
2105         _, stop, err := t.AnnounceToDht(s)
2106         if err != nil {
2107                 return err
2108         }
2109         select {
2110         case <-t.closed.Done():
2111         case <-time.After(5 * time.Minute):
2112         }
2113         stop()
2114         return nil
2115 }
2116
2117 func (t *Torrent) dhtAnnouncer(s DhtServer) {
2118         cl := t.cl
2119         cl.lock()
2120         defer cl.unlock()
2121         for {
2122                 for {
2123                         if t.closed.IsSet() {
2124                                 return
2125                         }
2126                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
2127                         // TODO: We can include the announce_peer step depending on whether we can receive
2128                         // inbound connections. We should probably only announce once every 15 mins too.
2129                         if !t.wantAnyConns() {
2130                                 goto wait
2131                         }
2132                         // TODO: Determine if there's a listener on the port we're announcing.
2133                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
2134                                 goto wait
2135                         }
2136                         break
2137                 wait:
2138                         cl.event.Wait()
2139                 }
2140                 func() {
2141                         t.numDHTAnnounces++
2142                         cl.unlock()
2143                         defer cl.lock()
2144                         err := t.timeboxedAnnounceToDht(s)
2145                         if err != nil {
2146                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
2147                         }
2148                 }()
2149         }
2150 }
2151
2152 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
2153         for _, p := range peers {
2154                 if t.addPeer(p) {
2155                         added++
2156                 }
2157         }
2158         return
2159 }
2160
2161 // The returned TorrentStats may require alignment in memory. See
2162 // https://github.com/anacrolix/torrent/issues/383.
2163 func (t *Torrent) Stats() TorrentStats {
2164         t.cl.rLock()
2165         defer t.cl.rUnlock()
2166         return t.statsLocked()
2167 }
2168
2169 func (t *Torrent) statsLocked() (ret TorrentStats) {
2170         ret.ActivePeers = len(t.conns)
2171         ret.HalfOpenPeers = len(t.halfOpen)
2172         ret.PendingPeers = t.peers.Len()
2173         ret.TotalPeers = t.numTotalPeers()
2174         ret.ConnectedSeeders = 0
2175         for c := range t.conns {
2176                 if all, ok := c.peerHasAllPieces(); all && ok {
2177                         ret.ConnectedSeeders++
2178                 }
2179         }
2180         ret.ConnStats = t.stats.Copy()
2181         ret.PiecesComplete = t.numPiecesCompleted()
2182         return
2183 }
2184
2185 // The total number of peers in the torrent.
2186 func (t *Torrent) numTotalPeers() int {
2187         peers := make(map[string]struct{})
2188         for conn := range t.conns {
2189                 ra := conn.conn.RemoteAddr()
2190                 if ra == nil {
2191                         // It's been closed and doesn't support RemoteAddr.
2192                         continue
2193                 }
2194                 peers[ra.String()] = struct{}{}
2195         }
2196         for addr := range t.halfOpen {
2197                 peers[addr] = struct{}{}
2198         }
2199         t.peers.Each(func(peer PeerInfo) {
2200                 peers[peer.Addr.String()] = struct{}{}
2201         })
2202         return len(peers)
2203 }
2204
2205 // Reconcile bytes transferred before connection was associated with a
2206 // torrent.
2207 func (t *Torrent) reconcileHandshakeStats(c *Peer) {
2208         if c._stats != (ConnStats{
2209                 // Handshakes should only increment these fields:
2210                 BytesWritten: c._stats.BytesWritten,
2211                 BytesRead:    c._stats.BytesRead,
2212         }) {
2213                 panic("bad stats")
2214         }
2215         c.postHandshakeStats(func(cs *ConnStats) {
2216                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
2217                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
2218         })
2219         c.reconciledHandshakeStats = true
2220 }
2221
2222 // Returns true if the connection is added.
2223 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
2224         defer func() {
2225                 if err == nil {
2226                         torrent.Add("added connections", 1)
2227                 }
2228         }()
2229         if t.closed.IsSet() {
2230                 return errors.New("torrent closed")
2231         }
2232         for c0 := range t.conns {
2233                 if c.PeerID != c0.PeerID {
2234                         continue
2235                 }
2236                 if !t.cl.config.DropDuplicatePeerIds {
2237                         continue
2238                 }
2239                 if c.hasPreferredNetworkOver(c0) {
2240                         c0.close()
2241                         t.deletePeerConn(c0)
2242                 } else {
2243                         return errors.New("existing connection preferred")
2244                 }
2245         }
2246         if len(t.conns) >= t.maxEstablishedConns {
2247                 numOutgoing := t.numOutgoingConns()
2248                 numIncoming := len(t.conns) - numOutgoing
2249                 c := t.worstBadConn(worseConnLensOpts{
2250                         // We've already established that we have too many connections at this point, so we just
2251                         // need to match what kind we have too many of vs. what we're trying to add now.
2252                         incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
2253                         outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
2254                 })
2255                 if c == nil {
2256                         return errors.New("don't want conn")
2257                 }
2258                 c.close()
2259                 t.deletePeerConn(c)
2260         }
2261         if len(t.conns) >= t.maxEstablishedConns {
2262                 panic(len(t.conns))
2263         }
2264         t.conns[c] = struct{}{}
2265         t.cl.event.Broadcast()
2266         // We'll never receive the "p" extended handshake parameter.
2267         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
2268                 t.pex.Add(c)
2269         }
2270         return nil
2271 }
2272
2273 func (t *Torrent) newConnsAllowed() bool {
2274         if !t.networkingEnabled.Bool() {
2275                 return false
2276         }
2277         if t.closed.IsSet() {
2278                 return false
2279         }
2280         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2281                 return false
2282         }
2283         return true
2284 }
2285
2286 func (t *Torrent) wantAnyConns() bool {
2287         if !t.networkingEnabled.Bool() {
2288                 return false
2289         }
2290         if t.closed.IsSet() {
2291                 return false
2292         }
2293         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2294                 return false
2295         }
2296         return len(t.conns) < t.maxEstablishedConns
2297 }
2298
2299 func (t *Torrent) wantOutgoingConns() bool {
2300         if !t.newConnsAllowed() {
2301                 return false
2302         }
2303         if len(t.conns) < t.maxEstablishedConns {
2304                 return true
2305         }
2306         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2307         return t.worstBadConn(worseConnLensOpts{
2308                 incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
2309                 outgoingIsBad: false,
2310         }) != nil
2311 }
2312
2313 func (t *Torrent) wantIncomingConns() bool {
2314         if !t.newConnsAllowed() {
2315                 return false
2316         }
2317         if len(t.conns) < t.maxEstablishedConns {
2318                 return true
2319         }
2320         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2321         return t.worstBadConn(worseConnLensOpts{
2322                 incomingIsBad: false,
2323                 outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
2324         }) != nil
2325 }
2326
2327 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
2328         t.cl.lock()
2329         defer t.cl.unlock()
2330         oldMax = t.maxEstablishedConns
2331         t.maxEstablishedConns = max
2332         wcs := worseConnSlice{
2333                 conns: t.appendConns(nil, func(*PeerConn) bool {
2334                         return true
2335                 }),
2336         }
2337         wcs.initKeys(worseConnLensOpts{})
2338         heap.Init(&wcs)
2339         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
2340                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
2341         }
2342         t.openNewConns()
2343         return oldMax
2344 }
2345
2346 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
2347         t.logger.LazyLog(log.Debug, func() log.Msg {
2348                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
2349         })
2350         p := t.piece(piece)
2351         p.numVerifies++
2352         t.cl.event.Broadcast()
2353         if t.closed.IsSet() {
2354                 return
2355         }
2356
2357         // Don't score the first time a piece is hashed, it could be an initial check.
2358         if p.storageCompletionOk {
2359                 if passed {
2360                         pieceHashedCorrect.Add(1)
2361                 } else {
2362                         log.Fmsg(
2363                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
2364                         ).AddValues(t, p).LogLevel(log.Info, t.logger)
2365                         pieceHashedNotCorrect.Add(1)
2366                 }
2367         }
2368
2369         p.marking = true
2370         t.publishPieceStateChange(piece)
2371         defer func() {
2372                 p.marking = false
2373                 t.publishPieceStateChange(piece)
2374         }()
2375
2376         if passed {
2377                 if len(p.dirtiers) != 0 {
2378                         // Don't increment stats above connection-level for every involved connection.
2379                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
2380                 }
2381                 for c := range p.dirtiers {
2382                         c._stats.incrementPiecesDirtiedGood()
2383                 }
2384                 t.clearPieceTouchers(piece)
2385                 hasDirty := p.hasDirtyChunks()
2386                 t.cl.unlock()
2387                 if hasDirty {
2388                         p.Flush() // You can be synchronous here!
2389                 }
2390                 err := p.Storage().MarkComplete()
2391                 if err != nil {
2392                         t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err)
2393                 }
2394                 t.cl.lock()
2395
2396                 if t.closed.IsSet() {
2397                         return
2398                 }
2399                 t.pendAllChunkSpecs(piece)
2400         } else {
2401                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2402                         // Peers contributed to all the data for this piece hash failure, and the failure was
2403                         // not due to errors in the storage (such as data being dropped in a cache).
2404
2405                         // Increment Torrent and above stats, and then specific connections.
2406                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2407                         for c := range p.dirtiers {
2408                                 // Y u do dis peer?!
2409                                 c.stats().incrementPiecesDirtiedBad()
2410                         }
2411
2412                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2413                         for c := range p.dirtiers {
2414                                 if !c.trusted {
2415                                         bannableTouchers = append(bannableTouchers, c)
2416                                 }
2417                         }
2418                         t.clearPieceTouchers(piece)
2419                         slices.Sort(bannableTouchers, connLessTrusted)
2420
2421                         if t.cl.config.Debug {
2422                                 t.logger.Printf(
2423                                         "bannable conns by trust for piece %d: %v",
2424                                         piece,
2425                                         func() (ret []connectionTrust) {
2426                                                 for _, c := range bannableTouchers {
2427                                                         ret = append(ret, c.trust())
2428                                                 }
2429                                                 return
2430                                         }(),
2431                                 )
2432                         }
2433
2434                         if len(bannableTouchers) >= 1 {
2435                                 c := bannableTouchers[0]
2436                                 if len(bannableTouchers) != 1 {
2437                                         t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2438                                 } else {
2439                                         // Turns out it's still useful to ban peers like this because if there's only a
2440                                         // single peer for a piece, and we never progress that piece to completion, we
2441                                         // will never smart-ban them. Discovered in
2442                                         // https://github.com/anacrolix/torrent/issues/715.
2443                                         t.logger.Levelf(
2444                                                 log.Warning,
2445                                                 "banning %v for being sole dirtier of piece %v after failed piece check",
2446                                                 c,
2447                                                 piece,
2448                                         )
2449                                         c.ban()
2450                                 }
2451                         }
2452                 }
2453                 t.onIncompletePiece(piece)
2454                 p.Storage().MarkNotComplete()
2455         }
2456         t.updatePieceCompletion(piece)
2457 }
2458
2459 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2460         start := t.pieceRequestIndexOffset(piece)
2461         end := start + t.pieceNumChunks(piece)
2462         for ri := start; ri < end; ri++ {
2463                 t.cancelRequest(ri)
2464         }
2465 }
2466
2467 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2468         t.pendAllChunkSpecs(piece)
2469         t.cancelRequestsForPiece(piece)
2470         t.piece(piece).readerCond.Broadcast()
2471         for conn := range t.conns {
2472                 conn.have(piece)
2473                 t.maybeDropMutuallyCompletePeer(conn)
2474         }
2475 }
2476
2477 // Called when a piece is found to be not complete.
2478 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2479         if t.pieceAllDirty(piece) {
2480                 t.pendAllChunkSpecs(piece)
2481         }
2482         if !t.wantPieceIndex(piece) {
2483                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2484                 return
2485         }
2486         // We could drop any connections that we told we have a piece that we
2487         // don't here. But there's a test failure, and it seems clients don't care
2488         // if you request pieces that you already claim to have. Pruning bad
2489         // connections might just remove any connections that aren't treating us
2490         // favourably anyway.
2491
2492         // for c := range t.conns {
2493         //      if c.sentHave(piece) {
2494         //              c.drop()
2495         //      }
2496         // }
2497         t.iterPeers(func(conn *Peer) {
2498                 if conn.peerHasPiece(piece) {
2499                         conn.updateRequests("piece incomplete")
2500                 }
2501         })
2502 }
2503
2504 func (t *Torrent) tryCreateMorePieceHashers() {
2505         for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() {
2506         }
2507 }
2508
2509 func (t *Torrent) tryCreatePieceHasher() bool {
2510         if t.storage == nil {
2511                 return false
2512         }
2513         pi, ok := t.getPieceToHash()
2514         if !ok {
2515                 return false
2516         }
2517         p := t.piece(pi)
2518         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2519         p.hashing = true
2520         t.publishPieceStateChange(pi)
2521         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2522         t.storageLock.RLock()
2523         t.activePieceHashes++
2524         go t.pieceHasher(pi)
2525         return true
2526 }
2527
2528 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2529         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2530                 if t.piece(i).hashing {
2531                         return true
2532                 }
2533                 ret = i
2534                 ok = true
2535                 return false
2536         })
2537         return
2538 }
2539
2540 func (t *Torrent) dropBannedPeers() {
2541         t.iterPeers(func(p *Peer) {
2542                 remoteIp := p.remoteIp()
2543                 if remoteIp == nil {
2544                         if p.bannableAddr.Ok {
2545                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2546                         }
2547                         return
2548                 }
2549                 netipAddr := netip.MustParseAddr(remoteIp.String())
2550                 if Some(netipAddr) != p.bannableAddr {
2551                         t.logger.WithDefaultLevel(log.Debug).Printf(
2552                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2553                                 p, remoteIp, p.bannableAddr)
2554                 }
2555                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2556                         // Should this be a close?
2557                         p.drop()
2558                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2559                 }
2560         })
2561 }
2562
2563 func (t *Torrent) pieceHasher(index pieceIndex) {
2564         p := t.piece(index)
2565         // Do we really need to spell out that it's a copy error? If it's a failure to hash the hash
2566         // will just be wrong.
2567         correct, failedPeers, copyErr := t.hashPiece(index)
2568         switch copyErr {
2569         case nil, io.EOF:
2570         default:
2571                 t.logger.WithNames("hashing").Levelf(
2572                         log.Warning,
2573                         "error hashing piece %v: %v", index, copyErr)
2574         }
2575         t.storageLock.RUnlock()
2576         t.cl.lock()
2577         defer t.cl.unlock()
2578         if correct {
2579                 for peer := range failedPeers {
2580                         t.cl.banPeerIP(peer.AsSlice())
2581                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2582                 }
2583                 t.dropBannedPeers()
2584                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2585                         t.smartBanCache.ForgetBlock(ri)
2586                 }
2587         }
2588         p.hashing = false
2589         t.pieceHashed(index, correct, copyErr)
2590         t.updatePiecePriority(index, "Torrent.pieceHasher")
2591         t.activePieceHashes--
2592         t.tryCreateMorePieceHashers()
2593 }
2594
2595 // Return the connections that touched a piece, and clear the entries while doing it.
2596 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2597         p := t.piece(pi)
2598         for c := range p.dirtiers {
2599                 delete(c.peerTouchedPieces, pi)
2600                 delete(p.dirtiers, c)
2601         }
2602 }
2603
2604 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2605         t.iterPeers(func(p *Peer) {
2606                 ret = append(ret, p)
2607         })
2608         return
2609 }
2610
2611 func (t *Torrent) queueInitialPieceCheck(i pieceIndex) {
2612         if !t.initialPieceCheckDisabled && !t.piece(i).storageCompletionOk {
2613                 t.queuePieceCheck(i)
2614         }
2615 }
2616
2617 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2618         piece := t.piece(pieceIndex)
2619         if !piece.haveHash() {
2620                 return
2621         }
2622         if piece.queuedForHash() {
2623                 return
2624         }
2625         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2626         t.publishPieceStateChange(pieceIndex)
2627         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2628         t.tryCreateMorePieceHashers()
2629 }
2630
2631 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2632 // before the Info is available.
2633 func (t *Torrent) VerifyData() {
2634         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2635                 t.Piece(i).VerifyData()
2636         }
2637 }
2638
2639 func (t *Torrent) connectingToPeerAddr(addrStr string) bool {
2640         return len(t.halfOpen[addrStr]) != 0
2641 }
2642
2643 func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool {
2644         addrStr := x.String()
2645         for c := range t.conns {
2646                 ra := c.RemoteAddr
2647                 if ra.String() == addrStr {
2648                         return true
2649                 }
2650         }
2651         return false
2652 }
2653
2654 func (t *Torrent) getHalfOpenPath(
2655         addrStr string,
2656         attemptKey outgoingConnAttemptKey,
2657 ) nestedmaps.Path[*PeerInfo] {
2658         return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey)
2659 }
2660
2661 func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) {
2662         path := t.getHalfOpenPath(addrStr, attemptKey)
2663         if path.Exists() {
2664                 panic("should be unique")
2665         }
2666         path.Set(attemptKey)
2667         t.cl.numHalfOpen++
2668 }
2669
2670 // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not
2671 // sure all the PeerInfo fields are being used.
2672 func initiateConn(
2673         opts outgoingConnOpts,
2674         ignoreLimits bool,
2675 ) {
2676         t := opts.t
2677         peer := opts.peerInfo
2678         if peer.Id == t.cl.peerID {
2679                 return
2680         }
2681         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2682                 return
2683         }
2684         addr := peer.Addr
2685         addrStr := addr.String()
2686         if !ignoreLimits {
2687                 if t.connectingToPeerAddr(addrStr) {
2688                         return
2689                 }
2690         }
2691         if t.hasPeerConnForAddr(addr) {
2692                 return
2693         }
2694         attemptKey := &peer
2695         t.addHalfOpen(addrStr, attemptKey)
2696         go t.cl.outgoingConnection(
2697                 opts,
2698                 attemptKey,
2699         )
2700 }
2701
2702 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2703 // quickly make one Client visible to the Torrent of another Client.
2704 func (t *Torrent) AddClientPeer(cl *Client) int {
2705         return t.AddPeers(func() (ps []PeerInfo) {
2706                 for _, la := range cl.ListenAddrs() {
2707                         ps = append(ps, PeerInfo{
2708                                 Addr:    la,
2709                                 Trusted: true,
2710                         })
2711                 }
2712                 return
2713         }())
2714 }
2715
2716 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2717 // connection.
2718 func (t *Torrent) allStats(f func(*ConnStats)) {
2719         f(&t.stats)
2720         f(&t.cl.connStats)
2721 }
2722
2723 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2724         return t.pieces[i].hashing
2725 }
2726
2727 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2728         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2729 }
2730
2731 func (t *Torrent) dialTimeout() time.Duration {
2732         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2733 }
2734
2735 func (t *Torrent) piece(i int) *Piece {
2736         return &t.pieces[i]
2737 }
2738
2739 func (t *Torrent) onWriteChunkErr(err error) {
2740         if t.userOnWriteChunkErr != nil {
2741                 go t.userOnWriteChunkErr(err)
2742                 return
2743         }
2744         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2745         t.disallowDataDownloadLocked()
2746 }
2747
2748 func (t *Torrent) DisallowDataDownload() {
2749         t.cl.lock()
2750         defer t.cl.unlock()
2751         t.disallowDataDownloadLocked()
2752 }
2753
2754 func (t *Torrent) disallowDataDownloadLocked() {
2755         t.dataDownloadDisallowed.Set()
2756         t.iterPeers(func(p *Peer) {
2757                 // Could check if peer request state is empty/not interested?
2758                 p.updateRequests("disallow data download")
2759                 p.cancelAllRequests()
2760         })
2761 }
2762
2763 func (t *Torrent) AllowDataDownload() {
2764         t.cl.lock()
2765         defer t.cl.unlock()
2766         t.dataDownloadDisallowed.Clear()
2767         t.iterPeers(func(p *Peer) {
2768                 p.updateRequests("allow data download")
2769         })
2770 }
2771
2772 // Enables uploading data, if it was disabled.
2773 func (t *Torrent) AllowDataUpload() {
2774         t.cl.lock()
2775         defer t.cl.unlock()
2776         t.dataUploadDisallowed = false
2777         t.iterPeers(func(p *Peer) {
2778                 p.updateRequests("allow data upload")
2779         })
2780 }
2781
2782 // Disables uploading data, if it was enabled.
2783 func (t *Torrent) DisallowDataUpload() {
2784         t.cl.lock()
2785         defer t.cl.unlock()
2786         t.dataUploadDisallowed = true
2787         for c := range t.conns {
2788                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2789                 c.updateRequests("disallow data upload")
2790         }
2791 }
2792
2793 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2794 // or if nil, a critical message is logged, and data download is disabled.
2795 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2796         t.cl.lock()
2797         defer t.cl.unlock()
2798         t.userOnWriteChunkErr = f
2799 }
2800
2801 func (t *Torrent) iterPeers(f func(p *Peer)) {
2802         for pc := range t.conns {
2803                 f(&pc.Peer)
2804         }
2805         for _, ws := range t.webSeeds {
2806                 f(ws)
2807         }
2808 }
2809
2810 func (t *Torrent) callbacks() *Callbacks {
2811         return &t.cl.config.Callbacks
2812 }
2813
2814 type AddWebSeedsOpt func(*webseed.Client)
2815
2816 // Sets the WebSeed trailing path escaper for a webseed.Client.
2817 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2818         return func(c *webseed.Client) {
2819                 c.PathEscaper = custom
2820         }
2821 }
2822
2823 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2824         t.cl.lock()
2825         defer t.cl.unlock()
2826         for _, u := range urls {
2827                 t.addWebSeed(u, opts...)
2828         }
2829 }
2830
2831 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2832         if t.cl.config.DisableWebseeds {
2833                 return
2834         }
2835         if _, ok := t.webSeeds[url]; ok {
2836                 return
2837         }
2838         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2839         // right away. This value should be some multiple of the number of connections to a host. I
2840         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2841         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2842         // "https://webtorrent.io/torrents/".
2843         const maxRequests = 16
2844         ws := webseedPeer{
2845                 peer: Peer{
2846                         t:                        t,
2847                         outgoing:                 true,
2848                         Network:                  "http",
2849                         reconciledHandshakeStats: true,
2850                         // This should affect how often we have to recompute requests for this peer. Note that
2851                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2852                         // requests mark more often, so recomputation is probably sooner than with regular peer
2853                         // conns. ~4x maxRequests would be about right.
2854                         PeerMaxRequests: 128,
2855                         // TODO: Set ban prefix?
2856                         RemoteAddr: remoteAddrFromUrl(url),
2857                         callbacks:  t.callbacks(),
2858                 },
2859                 client: webseed.Client{
2860                         HttpClient: t.cl.httpClient,
2861                         Url:        url,
2862                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2863                                 return &rateLimitedReader{
2864                                         l: t.cl.config.DownloadRateLimiter,
2865                                         r: r,
2866                                 }
2867                         },
2868                 },
2869                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2870         }
2871         ws.peer.initRequestState()
2872         for _, opt := range opts {
2873                 opt(&ws.client)
2874         }
2875         ws.peer.initUpdateRequestsTimer()
2876         ws.requesterCond.L = t.cl.locker()
2877         for i := 0; i < maxRequests; i += 1 {
2878                 go ws.requester(i)
2879         }
2880         for _, f := range t.callbacks().NewPeer {
2881                 f(&ws.peer)
2882         }
2883         ws.peer.logger = t.logger.WithContextValue(&ws)
2884         ws.peer.peerImpl = &ws
2885         if t.haveInfo() {
2886                 ws.onGotInfo(t.info)
2887         }
2888         t.webSeeds[url] = &ws.peer
2889 }
2890
2891 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2892         t.iterPeers(func(p1 *Peer) {
2893                 if p1 == p {
2894                         active = true
2895                 }
2896         })
2897         return
2898 }
2899
2900 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2901         index := t.pieceIndexOfRequestIndex(ri)
2902         return Request{
2903                 pp.Integer(index),
2904                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2905         }
2906 }
2907
2908 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2909         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2910 }
2911
2912 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2913         return RequestIndex(piece) * t.chunksPerRegularPiece()
2914 }
2915
2916 func (t *Torrent) updateComplete() {
2917         t.Complete.SetBool(t.haveAllPieces())
2918 }
2919
2920 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2921         p := t.requestingPeer(r)
2922         if p != nil {
2923                 p.cancel(r)
2924         }
2925         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2926         //delete(t.pendingRequests, r)
2927         if _, ok := t.requestState[r]; ok {
2928                 panic("expected request state to be gone")
2929         }
2930         return p
2931 }
2932
2933 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2934         return t.requestState[r].peer
2935 }
2936
2937 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2938         if t.connsWithAllPieces == nil {
2939                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2940         }
2941         t.connsWithAllPieces[p] = struct{}{}
2942 }
2943
2944 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2945         _, ok := t.connsWithAllPieces[p]
2946         delete(t.connsWithAllPieces, p)
2947         return ok
2948 }
2949
2950 func (t *Torrent) numActivePeers() int {
2951         return len(t.conns) + len(t.webSeeds)
2952 }
2953
2954 func (t *Torrent) hasStorageCap() bool {
2955         f := t.storage.Capacity
2956         if f == nil {
2957                 return false
2958         }
2959         _, ok := (*f)()
2960         return ok
2961 }
2962
2963 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2964         return pieceIndex(ri / t.chunksPerRegularPiece())
2965 }
2966
2967 func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
2968         reuseIter *typedRoaring.Iterator[RequestIndex],
2969         piece pieceIndex,
2970         f func(RequestIndex),
2971 ) {
2972         reuseIter.Initialize(&t.dirtyChunks)
2973         pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
2974         iterBitmapUnsetInRange(
2975                 reuseIter,
2976                 pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
2977                 f,
2978         )
2979 }
2980
2981 type requestState struct {
2982         peer *Peer
2983         when time.Time
2984 }
2985
2986 // Returns an error if a received chunk is out of bounds in someway.
2987 func (t *Torrent) checkValidReceiveChunk(r Request) error {
2988         if !t.haveInfo() {
2989                 return errors.New("torrent missing info")
2990         }
2991         if int(r.Index) >= t.numPieces() {
2992                 return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
2993         }
2994         pieceLength := t.pieceLength(pieceIndex(r.Index))
2995         if r.Begin >= pieceLength {
2996                 return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
2997         }
2998         // We could check chunk lengths here, but chunk request size is not changed often, and tricky
2999         // for peers to manipulate as they need to send potentially large buffers to begin with. There
3000         // should be considerable checks elsewhere for this case due to the network overhead. We should
3001         // catch most of the overflow manipulation stuff by checking index and begin above.
3002         return nil
3003 }
3004
3005 func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
3006         for pc := range t.conns {
3007                 dialAddr, err := pc.remoteDialAddrPort()
3008                 if err != nil {
3009                         continue
3010                 }
3011                 if dialAddr != target {
3012                         continue
3013                 }
3014                 ret = append(ret, pc)
3015         }
3016         return
3017 }
3018
3019 func wrapUtHolepunchMsgForPeerConn(
3020         recipient *PeerConn,
3021         msg utHolepunch.Msg,
3022 ) pp.Message {
3023         extendedPayload, err := msg.MarshalBinary()
3024         if err != nil {
3025                 panic(err)
3026         }
3027         return pp.Message{
3028                 Type:            pp.Extended,
3029                 ExtendedID:      MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
3030                 ExtendedPayload: extendedPayload,
3031         }
3032 }
3033
3034 func sendUtHolepunchMsg(
3035         pc *PeerConn,
3036         msgType utHolepunch.MsgType,
3037         addrPort netip.AddrPort,
3038         errCode utHolepunch.ErrCode,
3039 ) {
3040         holepunchMsg := utHolepunch.Msg{
3041                 MsgType:  msgType,
3042                 AddrPort: addrPort,
3043                 ErrCode:  errCode,
3044         }
3045         incHolepunchMessagesSent(holepunchMsg)
3046         ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg)
3047         pc.write(ppMsg)
3048 }
3049
3050 func incHolepunchMessages(msg utHolepunch.Msg, verb string) {
3051         torrent.Add(
3052                 fmt.Sprintf(
3053                         "holepunch %v %v messages %v",
3054                         msg.MsgType,
3055                         addrPortProtocolStr(msg.AddrPort),
3056                         verb,
3057                 ),
3058                 1,
3059         )
3060 }
3061
3062 func incHolepunchMessagesReceived(msg utHolepunch.Msg) {
3063         incHolepunchMessages(msg, "received")
3064 }
3065
3066 func incHolepunchMessagesSent(msg utHolepunch.Msg) {
3067         incHolepunchMessages(msg, "sent")
3068 }
3069
3070 func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
3071         incHolepunchMessagesReceived(msg)
3072         switch msg.MsgType {
3073         case utHolepunch.Rendezvous:
3074                 t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
3075                 sendMsg := sendUtHolepunchMsg
3076                 senderAddrPort, err := sender.remoteDialAddrPort()
3077                 if err != nil {
3078                         sender.logger.Levelf(
3079                                 log.Warning,
3080                                 "error getting ut_holepunch rendezvous sender's dial address: %v",
3081                                 err,
3082                         )
3083                         // There's no better error code. The sender's address itself is invalid. I don't see
3084                         // this error message being appropriate anywhere else anyway.
3085                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
3086                 }
3087                 targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
3088                 if len(targets) == 0 {
3089                         sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
3090                         return nil
3091                 }
3092                 for _, pc := range targets {
3093                         if !pc.supportsExtension(utHolepunch.ExtensionName) {
3094                                 sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
3095                                 continue
3096                         }
3097                         sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
3098                         sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
3099                 }
3100                 return nil
3101         case utHolepunch.Connect:
3102                 holepunchAddr := msg.AddrPort
3103                 t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender)
3104                 if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) {
3105                         setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr)
3106                         if g.MapContains(t.cl.accepted, holepunchAddr) {
3107                                 setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
3108                         }
3109                 }
3110                 opts := outgoingConnOpts{
3111                         peerInfo: PeerInfo{
3112                                 Addr:         msg.AddrPort,
3113                                 Source:       PeerSourceUtHolepunch,
3114                                 PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
3115                         },
3116                         t: t,
3117                         // Don't attempt to start our own rendezvous if we fail to connect.
3118                         skipHolepunchRendezvous:  true,
3119                         receivedHolepunchConnect: true,
3120                         // Assume that the other end initiated the rendezvous, and will use our preferred
3121                         // encryption. So we will act normally.
3122                         HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
3123                 }
3124                 initiateConn(opts, true)
3125                 return nil
3126         case utHolepunch.Error:
3127                 torrent.Add("holepunch error messages received", 1)
3128                 t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
3129                 return nil
3130         default:
3131                 return fmt.Errorf("unhandled msg type %v", msg.MsgType)
3132         }
3133 }
3134
3135 func addrPortProtocolStr(addrPort netip.AddrPort) string {
3136         addr := addrPort.Addr()
3137         switch {
3138         case addr.Is4():
3139                 return "ipv4"
3140         case addr.Is6():
3141                 return "ipv6"
3142         default:
3143                 panic(addrPort)
3144         }
3145 }
3146
3147 func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
3148         rzsSent := 0
3149         for pc := range t.conns {
3150                 if !pc.supportsExtension(utHolepunch.ExtensionName) {
3151                         continue
3152                 }
3153                 if pc.supportsExtension(pp.ExtensionNamePex) {
3154                         if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
3155                                 continue
3156                         }
3157                 }
3158                 t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
3159                 sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
3160                 rzsSent++
3161         }
3162         if rzsSent == 0 {
3163                 return errors.New("no eligible relays")
3164         }
3165         return nil
3166 }
3167
3168 func (t *Torrent) numHalfOpenAttempts() (num int) {
3169         for _, attempts := range t.halfOpen {
3170                 num += len(attempts)
3171         }
3172         return
3173 }
3174
3175 func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
3176         cl := t.cl
3177         cl.rLock()
3178         defer cl.rUnlock()
3179         return t.dialTimeout()
3180 }
3181
3182 func (t *Torrent) canonicalShortInfohash() *infohash.T {
3183         if t.infoHash.Ok {
3184                 return &t.infoHash.Value
3185         }
3186         return t.infoHashV2.UnwrapPtr().ToShort()
3187 }
3188
3189 func (t *Torrent) eachShortInfohash(each func(short [20]byte)) {
3190         if t.infoHash.Value == *t.infoHashV2.Value.ToShort() {
3191                 // This includes zero values, since they both should not be zero. Plus Option should not
3192                 // allow non-zero values for None.
3193                 panic("v1 and v2 info hashes should not be the same")
3194         }
3195         if t.infoHash.Ok {
3196                 each(t.infoHash.Value)
3197         }
3198         if t.infoHashV2.Ok {
3199                 v2Short := *t.infoHashV2.Value.ToShort()
3200                 each(v2Short)
3201         }
3202 }
3203
3204 func (t *Torrent) getFileByPiecesRoot(hash [32]byte) *File {
3205         for _, f := range *t.files {
3206                 if f.piecesRoot.Unwrap() == hash {
3207                         return f
3208                 }
3209         }
3210         return nil
3211 }
3212
3213 func (t *Torrent) pieceLayers() (pieceLayers map[string]string) {
3214         if t.files == nil {
3215                 return
3216         }
3217         files := *t.files
3218         g.MakeMapWithCap(&pieceLayers, len(files))
3219 file:
3220         for _, f := range files {
3221                 if !f.piecesRoot.Ok {
3222                         continue
3223                 }
3224                 key := f.piecesRoot.Value
3225                 var value strings.Builder
3226                 for i := f.BeginPieceIndex(); i < f.EndPieceIndex(); i++ {
3227                         hashOpt := t.piece(i).hashV2
3228                         if !hashOpt.Ok {
3229                                 // All hashes must be present. This implementation should handle missing files, so
3230                                 // move on to the next file.
3231                                 continue file
3232                         }
3233                         value.Write(hashOpt.Value[:])
3234                 }
3235                 if value.Len() == 0 {
3236                         // Non-empty files are not recorded in piece layers.
3237                         continue
3238                 }
3239                 // If multiple files have the same root that shouldn't matter.
3240                 pieceLayers[string(key[:])] = value.String()
3241         }
3242         return
3243 }