]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Order peer conn status by peer priority and show webseeds separately
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "math/rand"
12         "net/netip"
13         "net/url"
14         "sort"
15         "strings"
16         "text/tabwriter"
17         "time"
18         "unsafe"
19
20         "github.com/RoaringBitmap/roaring"
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/chansync/events"
23         "github.com/anacrolix/dht/v2"
24         . "github.com/anacrolix/generics"
25         g "github.com/anacrolix/generics"
26         "github.com/anacrolix/log"
27         "github.com/anacrolix/missinggo/perf"
28         "github.com/anacrolix/missinggo/slices"
29         "github.com/anacrolix/missinggo/v2"
30         "github.com/anacrolix/missinggo/v2/bitmap"
31         "github.com/anacrolix/missinggo/v2/pubsub"
32         "github.com/anacrolix/multiless"
33         "github.com/anacrolix/sync"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/pion/datachannel"
36         "golang.org/x/exp/maps"
37
38         "github.com/anacrolix/torrent/bencode"
39         "github.com/anacrolix/torrent/common"
40         "github.com/anacrolix/torrent/metainfo"
41         pp "github.com/anacrolix/torrent/peer_protocol"
42         request_strategy "github.com/anacrolix/torrent/request-strategy"
43         "github.com/anacrolix/torrent/segments"
44         "github.com/anacrolix/torrent/storage"
45         "github.com/anacrolix/torrent/tracker"
46         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
47         "github.com/anacrolix/torrent/webseed"
48         "github.com/anacrolix/torrent/webtorrent"
49 )
50
51 // Maintains state of torrent within a Client. Many methods should not be called before the info is
52 // available, see .Info and .GotInfo.
53 type Torrent struct {
54         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
55         // alignment. See #262.
56         stats  ConnStats
57         cl     *Client
58         logger log.Logger
59
60         networkingEnabled      chansync.Flag
61         dataDownloadDisallowed chansync.Flag
62         dataUploadDisallowed   bool
63         userOnWriteChunkErr    func(error)
64
65         closed   chansync.SetOnce
66         onClose  []func()
67         infoHash metainfo.Hash
68         pieces   []Piece
69
70         // The order pieces are requested if there's no stronger reason like availability or priority.
71         pieceRequestOrder []int
72         // Values are the piece indices that changed.
73         pieceStateChanges pubsub.PubSub[PieceStateChange]
74         // The size of chunks to request from peers over the wire. This is
75         // normally 16KiB by convention these days.
76         chunkSize pp.Integer
77         chunkPool sync.Pool
78         // Total length of the torrent in bytes. Stored because it's not O(1) to
79         // get this from the info dict.
80         _length Option[int64]
81
82         // The storage to open when the info dict becomes available.
83         storageOpener *storage.Client
84         // Storage for torrent data.
85         storage *storage.Torrent
86         // Read-locked for using storage, and write-locked for Closing.
87         storageLock sync.RWMutex
88
89         // TODO: Only announce stuff is used?
90         metainfo metainfo.MetaInfo
91
92         // The info dict. nil if we don't have it (yet).
93         info      *metainfo.Info
94         fileIndex segments.Index
95         files     *[]*File
96
97         _chunksPerRegularPiece chunkIndexType
98
99         webSeeds map[string]*Peer
100         // Active peer connections, running message stream loops. TODO: Make this
101         // open (not-closed) connections only.
102         conns               map[*PeerConn]struct{}
103         maxEstablishedConns int
104         // Set of addrs to which we're attempting to connect. Connections are
105         // half-open until all handshakes are completed.
106         halfOpen map[string]PeerInfo
107
108         // Reserve of peers to connect to. A peer can be both here and in the
109         // active connections if were told about the peer after connecting with
110         // them. That encourages us to reconnect to peers that are well known in
111         // the swarm.
112         peers prioritizedPeers
113         // Whether we want to know more peers.
114         wantPeersEvent missinggo.Event
115         // An announcer for each tracker URL.
116         trackerAnnouncers map[string]torrentTrackerAnnouncer
117         // How many times we've initiated a DHT announce. TODO: Move into stats.
118         numDHTAnnounces int
119
120         // Name used if the info name isn't available. Should be cleared when the
121         // Info does become available.
122         nameMu      sync.RWMutex
123         displayName string
124
125         // The bencoded bytes of the info dict. This is actively manipulated if
126         // the info bytes aren't initially available, and we try to fetch them
127         // from peers.
128         metadataBytes []byte
129         // Each element corresponds to the 16KiB metadata pieces. If true, we have
130         // received that piece.
131         metadataCompletedChunks []bool
132         metadataChanged         sync.Cond
133
134         // Closed when .Info is obtained.
135         gotMetainfoC chan struct{}
136
137         readers                map[*reader]struct{}
138         _readerNowPieces       bitmap.Bitmap
139         _readerReadaheadPieces bitmap.Bitmap
140
141         // A cache of pieces we need to get. Calculated from various piece and
142         // file priorities and completion states elsewhere.
143         _pendingPieces roaring.Bitmap
144         // A cache of completed piece indices.
145         _completedPieces roaring.Bitmap
146         // Pieces that need to be hashed.
147         piecesQueuedForHash       bitmap.Bitmap
148         activePieceHashes         int
149         initialPieceCheckDisabled bool
150
151         connsWithAllPieces map[*Peer]struct{}
152
153         requestState map[RequestIndex]requestState
154         // Chunks we've written to since the corresponding piece was last checked.
155         dirtyChunks typedRoaring.Bitmap[RequestIndex]
156
157         pex pexState
158
159         // Is On when all pieces are complete.
160         Complete chansync.Flag
161
162         // Torrent sources in use keyed by the source string.
163         activeSources sync.Map
164         sourcesLogger log.Logger
165
166         smartBanCache smartBanCache
167
168         // Large allocations reused between request state updates.
169         requestPieceStates []request_strategy.PieceRequestOrderState
170         requestIndexes     []RequestIndex
171 }
172
173 func (t *Torrent) length() int64 {
174         return t._length.Value
175 }
176
177 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
178         // This could be done with roaring.BitSliceIndexing.
179         t.iterPeers(func(peer *Peer) {
180                 if _, ok := t.connsWithAllPieces[peer]; ok {
181                         return
182                 }
183                 if peer.peerHasPiece(i) {
184                         count++
185                 }
186         })
187         return
188 }
189
190 func (t *Torrent) decPieceAvailability(i pieceIndex) {
191         if !t.haveInfo() {
192                 return
193         }
194         p := t.piece(i)
195         if p.relativeAvailability <= 0 {
196                 panic(p.relativeAvailability)
197         }
198         p.relativeAvailability--
199         t.updatePieceRequestOrder(i)
200 }
201
202 func (t *Torrent) incPieceAvailability(i pieceIndex) {
203         // If we don't the info, this should be reconciled when we do.
204         if t.haveInfo() {
205                 p := t.piece(i)
206                 p.relativeAvailability++
207                 t.updatePieceRequestOrder(i)
208         }
209 }
210
211 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
212         return t._readerNowPieces
213 }
214
215 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
216         return t._readerReadaheadPieces
217 }
218
219 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
220         return !t.wantPieceIndex(i)
221 }
222
223 // Returns a channel that is closed when the Torrent is closed.
224 func (t *Torrent) Closed() events.Done {
225         return t.closed.Done()
226 }
227
228 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
229 // pending, and half-open peers.
230 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
231         // Add pending peers to the list
232         t.peers.Each(func(peer PeerInfo) {
233                 ks = append(ks, peer)
234         })
235
236         // Add half-open peers to the list
237         for _, peer := range t.halfOpen {
238                 ks = append(ks, peer)
239         }
240
241         // Add active peers to the list
242         for conn := range t.conns {
243                 ks = append(ks, PeerInfo{
244                         Id:     conn.PeerID,
245                         Addr:   conn.RemoteAddr,
246                         Source: conn.Discovery,
247                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
248                         // > But if we're not connected to them with an encrypted connection, I couldn't say
249                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
250                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
251                         // > connection. It's probably easiest to do the latter for now.
252                         // https://github.com/anacrolix/torrent/pull/188
253                         SupportsEncryption: conn.headerEncrypted,
254                 })
255         }
256
257         return
258 }
259
260 func (t *Torrent) setChunkSize(size pp.Integer) {
261         t.chunkSize = size
262         t.chunkPool = sync.Pool{
263                 New: func() interface{} {
264                         b := make([]byte, size)
265                         return &b
266                 },
267         }
268 }
269
270 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
271         return t._completedPieces.Contains(bitmap.BitIndex(piece))
272 }
273
274 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
275         if t.storage == nil {
276                 return storage.Completion{Complete: false, Ok: true}
277         }
278         return t.pieces[piece].Storage().Completion()
279 }
280
281 // There's a connection to that address already.
282 func (t *Torrent) addrActive(addr string) bool {
283         if _, ok := t.halfOpen[addr]; ok {
284                 return true
285         }
286         for c := range t.conns {
287                 ra := c.RemoteAddr
288                 if ra.String() == addr {
289                         return true
290                 }
291         }
292         return false
293 }
294
295 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
296         return t.appendConns(ret, func(conn *PeerConn) bool {
297                 return !conn.closed.IsSet()
298         })
299 }
300
301 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
302         for c := range t.conns {
303                 if f(c) {
304                         ret = append(ret, c)
305                 }
306         }
307         return ret
308 }
309
310 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
311         cl := t.cl
312         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
313         if t.closed.IsSet() {
314                 return false
315         }
316         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
317                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
318                         torrent.Add("peers not added because of bad addr", 1)
319                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
320                         return false
321                 }
322         }
323         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
324                 torrent.Add("peers replaced", 1)
325                 if !replaced.equal(p) {
326                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
327                         added = true
328                 }
329         } else {
330                 added = true
331         }
332         t.openNewConns()
333         for t.peers.Len() > cl.config.TorrentPeersHighWater {
334                 _, ok := t.peers.DeleteMin()
335                 if ok {
336                         torrent.Add("excess reserve peers discarded", 1)
337                 }
338         }
339         return
340 }
341
342 func (t *Torrent) invalidateMetadata() {
343         for i := 0; i < len(t.metadataCompletedChunks); i++ {
344                 t.metadataCompletedChunks[i] = false
345         }
346         t.nameMu.Lock()
347         t.gotMetainfoC = make(chan struct{})
348         t.info = nil
349         t.nameMu.Unlock()
350 }
351
352 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
353         if t.haveInfo() {
354                 return
355         }
356         if index >= len(t.metadataCompletedChunks) {
357                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
358                 return
359         }
360         copy(t.metadataBytes[(1<<14)*index:], data)
361         t.metadataCompletedChunks[index] = true
362 }
363
364 func (t *Torrent) metadataPieceCount() int {
365         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
366 }
367
368 func (t *Torrent) haveMetadataPiece(piece int) bool {
369         if t.haveInfo() {
370                 return (1<<14)*piece < len(t.metadataBytes)
371         } else {
372                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
373         }
374 }
375
376 func (t *Torrent) metadataSize() int {
377         return len(t.metadataBytes)
378 }
379
380 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
381         for i := 0; i < len(info.Pieces); i += sha1.Size {
382                 ret = append(ret, info.Pieces[i:i+sha1.Size])
383         }
384         return
385 }
386
387 func (t *Torrent) makePieces() {
388         hashes := infoPieceHashes(t.info)
389         t.pieces = make([]Piece, len(hashes))
390         for i, hash := range hashes {
391                 piece := &t.pieces[i]
392                 piece.t = t
393                 piece.index = pieceIndex(i)
394                 piece.noPendingWrites.L = &piece.pendingWritesMutex
395                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
396                 files := *t.files
397                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
398                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
399                 piece.files = files[beginFile:endFile]
400         }
401 }
402
403 // Returns the index of the first file containing the piece. files must be
404 // ordered by offset.
405 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
406         for i, f := range files {
407                 if f.offset+f.length > pieceOffset {
408                         return i
409                 }
410         }
411         return 0
412 }
413
414 // Returns the index after the last file containing the piece. files must be
415 // ordered by offset.
416 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
417         for i, f := range files {
418                 if f.offset+f.length >= pieceEndOffset {
419                         return i + 1
420                 }
421         }
422         return 0
423 }
424
425 func (t *Torrent) cacheLength() {
426         var l int64
427         for _, f := range t.info.UpvertedFiles() {
428                 l += f.Length
429         }
430         t._length = Some(l)
431 }
432
433 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
434 // separately.
435 func (t *Torrent) setInfo(info *metainfo.Info) error {
436         if err := validateInfo(info); err != nil {
437                 return fmt.Errorf("bad info: %s", err)
438         }
439         if t.storageOpener != nil {
440                 var err error
441                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
442                 if err != nil {
443                         return fmt.Errorf("error opening torrent storage: %s", err)
444                 }
445         }
446         t.nameMu.Lock()
447         t.info = info
448         t.nameMu.Unlock()
449         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
450         t.updateComplete()
451         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
452         t.displayName = "" // Save a few bytes lol.
453         t.initFiles()
454         t.cacheLength()
455         t.makePieces()
456         return nil
457 }
458
459 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
460         return request_strategy.PieceRequestOrderKey{
461                 InfoHash: t.infoHash,
462                 Index:    i,
463         }
464 }
465
466 // This seems to be all the follow-up tasks after info is set, that can't fail.
467 func (t *Torrent) onSetInfo() {
468         t.pieceRequestOrder = rand.Perm(t.numPieces())
469         t.initPieceRequestOrder()
470         MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
471         for i := range t.pieces {
472                 p := &t.pieces[i]
473                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
474                 // being dropped.
475                 if p.relativeAvailability != 0 {
476                         panic(p.relativeAvailability)
477                 }
478                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
479                 t.addRequestOrderPiece(i)
480                 t.updatePieceCompletion(i)
481                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
482                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
483                         t.queuePieceCheck(i)
484                 }
485         }
486         t.cl.event.Broadcast()
487         close(t.gotMetainfoC)
488         t.updateWantPeersEvent()
489         t.requestState = make(map[RequestIndex]requestState)
490         t.tryCreateMorePieceHashers()
491         t.iterPeers(func(p *Peer) {
492                 p.onGotInfo(t.info)
493                 p.updateRequests("onSetInfo")
494         })
495 }
496
497 // Called when metadata for a torrent becomes available.
498 func (t *Torrent) setInfoBytesLocked(b []byte) error {
499         if metainfo.HashBytes(b) != t.infoHash {
500                 return errors.New("info bytes have wrong hash")
501         }
502         var info metainfo.Info
503         if err := bencode.Unmarshal(b, &info); err != nil {
504                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
505         }
506         t.metadataBytes = b
507         t.metadataCompletedChunks = nil
508         if t.info != nil {
509                 return nil
510         }
511         if err := t.setInfo(&info); err != nil {
512                 return err
513         }
514         t.onSetInfo()
515         return nil
516 }
517
518 func (t *Torrent) haveAllMetadataPieces() bool {
519         if t.haveInfo() {
520                 return true
521         }
522         if t.metadataCompletedChunks == nil {
523                 return false
524         }
525         for _, have := range t.metadataCompletedChunks {
526                 if !have {
527                         return false
528                 }
529         }
530         return true
531 }
532
533 // TODO: Propagate errors to disconnect peer.
534 func (t *Torrent) setMetadataSize(size int) (err error) {
535         if t.haveInfo() {
536                 // We already know the correct metadata size.
537                 return
538         }
539         if uint32(size) > maxMetadataSize {
540                 return log.WithLevel(log.Warning, errors.New("bad size"))
541         }
542         if len(t.metadataBytes) == size {
543                 return
544         }
545         t.metadataBytes = make([]byte, size)
546         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
547         t.metadataChanged.Broadcast()
548         for c := range t.conns {
549                 c.requestPendingMetadata()
550         }
551         return
552 }
553
554 // The current working name for the torrent. Either the name in the info dict,
555 // or a display name given such as by the dn value in a magnet link, or "".
556 func (t *Torrent) name() string {
557         t.nameMu.RLock()
558         defer t.nameMu.RUnlock()
559         if t.haveInfo() {
560                 return t.info.BestName()
561         }
562         if t.displayName != "" {
563                 return t.displayName
564         }
565         return "infohash:" + t.infoHash.HexString()
566 }
567
568 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
569         p := &t.pieces[index]
570         ret.Priority = t.piecePriority(index)
571         ret.Completion = p.completion()
572         ret.QueuedForHash = p.queuedForHash()
573         ret.Hashing = p.hashing
574         ret.Checking = ret.QueuedForHash || ret.Hashing
575         ret.Marking = p.marking
576         if !ret.Complete && t.piecePartiallyDownloaded(index) {
577                 ret.Partial = true
578         }
579         return
580 }
581
582 func (t *Torrent) metadataPieceSize(piece int) int {
583         return metadataPieceSize(len(t.metadataBytes), piece)
584 }
585
586 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
587         return pp.Message{
588                 Type:       pp.Extended,
589                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
590                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
591                         Piece:     piece,
592                         TotalSize: len(t.metadataBytes),
593                         Type:      msgType,
594                 }), data...),
595         }
596 }
597
598 type pieceAvailabilityRun struct {
599         Count        pieceIndex
600         Availability int
601 }
602
603 func (me pieceAvailabilityRun) String() string {
604         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
605 }
606
607 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
608         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
609                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
610         })
611         for i := range t.pieces {
612                 rle.Append(t.pieces[i].availability(), 1)
613         }
614         rle.Flush()
615         return
616 }
617
618 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
619         freqs = make([]int, t.numActivePeers()+1)
620         for i := range t.pieces {
621                 freqs[t.piece(i).availability()]++
622         }
623         return
624 }
625
626 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
627         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
628                 ret = append(ret, PieceStateRun{
629                         PieceState: el.(PieceState),
630                         Length:     int(count),
631                 })
632         })
633         for index := range t.pieces {
634                 rle.Append(t.pieceState(pieceIndex(index)), 1)
635         }
636         rle.Flush()
637         return
638 }
639
640 // Produces a small string representing a PieceStateRun.
641 func (psr PieceStateRun) String() (ret string) {
642         ret = fmt.Sprintf("%d", psr.Length)
643         ret += func() string {
644                 switch psr.Priority {
645                 case PiecePriorityNext:
646                         return "N"
647                 case PiecePriorityNormal:
648                         return "."
649                 case PiecePriorityReadahead:
650                         return "R"
651                 case PiecePriorityNow:
652                         return "!"
653                 case PiecePriorityHigh:
654                         return "H"
655                 default:
656                         return ""
657                 }
658         }()
659         if psr.Hashing {
660                 ret += "H"
661         }
662         if psr.QueuedForHash {
663                 ret += "Q"
664         }
665         if psr.Marking {
666                 ret += "M"
667         }
668         if psr.Partial {
669                 ret += "P"
670         }
671         if psr.Complete {
672                 ret += "C"
673         }
674         if !psr.Ok {
675                 ret += "?"
676         }
677         return
678 }
679
680 func (t *Torrent) writeStatus(w io.Writer) {
681         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
682         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
683         if !t.haveInfo() {
684                 fmt.Fprintf(w, "Metadata have: ")
685                 for _, h := range t.metadataCompletedChunks {
686                         fmt.Fprintf(w, "%c", func() rune {
687                                 if h {
688                                         return 'H'
689                                 } else {
690                                         return '.'
691                                 }
692                         }())
693                 }
694                 fmt.Fprintln(w)
695         }
696         fmt.Fprintf(w, "Piece length: %s\n",
697                 func() string {
698                         if t.haveInfo() {
699                                 return fmt.Sprintf("%v (%v chunks)",
700                                         t.usualPieceSize(),
701                                         float64(t.usualPieceSize())/float64(t.chunkSize))
702                         } else {
703                                 return "no info"
704                         }
705                 }(),
706         )
707         if t.info != nil {
708                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
709                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
710                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
711                 // availability frequencies instead.
712                 if false {
713                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
714                                 for _, run := range t.pieceAvailabilityRuns() {
715                                         ret = append(ret, run.String())
716                                 }
717                                 return
718                         }(), " "))
719                 }
720                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
721                         func() (ret []string) {
722                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
723                                         if freq == 0 {
724                                                 continue
725                                         }
726                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
727                                 }
728                                 return
729                         }(),
730                         ", "))
731         }
732         fmt.Fprintf(w, "Reader Pieces:")
733         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
734                 fmt.Fprintf(w, " %d:%d", begin, end)
735                 return true
736         })
737         fmt.Fprintln(w)
738
739         fmt.Fprintf(w, "Enabled trackers:\n")
740         func() {
741                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
742                 fmt.Fprintf(tw, "    URL\tExtra\n")
743                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
744                         lu := l.URL()
745                         ru := r.URL()
746                         var luns, runs url.URL = *lu, *ru
747                         luns.Scheme = ""
748                         runs.Scheme = ""
749                         var ml missinggo.MultiLess
750                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
751                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
752                         return ml.Less()
753                 }).([]torrentTrackerAnnouncer) {
754                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
755                 }
756                 tw.Flush()
757         }()
758
759         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
760
761         spew.NewDefaultConfig()
762         spew.Fdump(w, t.statsLocked())
763
764         fmt.Fprintf(w, "webseeds:\n")
765         t.writePeerStatuses(w, maps.Values(t.webSeeds))
766
767         peerConns := maps.Keys(t.conns)
768         // Peers without priorities first, then those with. I'm undecided about how to order peers
769         // without priorities.
770         sort.Slice(peerConns, func(li, ri int) bool {
771                 l := peerConns[li]
772                 r := peerConns[ri]
773                 ml := multiless.New()
774                 lpp := g.ResultFromTuple(l.peerPriority()).ToOption()
775                 rpp := g.ResultFromTuple(r.peerPriority()).ToOption()
776                 ml = ml.Bool(lpp.Ok, rpp.Ok)
777                 ml = ml.Uint32(rpp.Value, lpp.Value)
778                 return ml.Less()
779         })
780
781         fmt.Fprintf(w, "peer conns:\n")
782         t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer {
783                 return &pc.Peer
784         }))
785 }
786
787 func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) {
788         var buf bytes.Buffer
789         for _, c := range peers {
790                 fmt.Fprintf(w, "- ")
791                 buf.Reset()
792                 c.writeStatus(&buf)
793                 w.Write(bytes.TrimRight(
794                         bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n  ")),
795                         " "))
796         }
797 }
798
799 func (t *Torrent) haveInfo() bool {
800         return t.info != nil
801 }
802
803 // Returns a run-time generated MetaInfo that includes the info bytes and
804 // announce-list as currently known to the client.
805 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
806         return metainfo.MetaInfo{
807                 CreationDate: time.Now().Unix(),
808                 Comment:      "dynamic metainfo from client",
809                 CreatedBy:    "go.torrent",
810                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
811                 InfoBytes: func() []byte {
812                         if t.haveInfo() {
813                                 return t.metadataBytes
814                         } else {
815                                 return nil
816                         }
817                 }(),
818                 UrlList: func() []string {
819                         ret := make([]string, 0, len(t.webSeeds))
820                         for url := range t.webSeeds {
821                                 ret = append(ret, url)
822                         }
823                         return ret
824                 }(),
825         }
826 }
827
828 // Get bytes left
829 func (t *Torrent) BytesMissing() (n int64) {
830         t.cl.rLock()
831         n = t.bytesMissingLocked()
832         t.cl.rUnlock()
833         return
834 }
835
836 func (t *Torrent) bytesMissingLocked() int64 {
837         return t.bytesLeft()
838 }
839
840 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
841         roaring.Flip(b, 0, end).Iterate(cb)
842 }
843
844 func (t *Torrent) bytesLeft() (left int64) {
845         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
846                 p := t.piece(pieceIndex(x))
847                 left += int64(p.length() - p.numDirtyBytes())
848                 return true
849         })
850         return
851 }
852
853 // Bytes left to give in tracker announces.
854 func (t *Torrent) bytesLeftAnnounce() int64 {
855         if t.haveInfo() {
856                 return t.bytesLeft()
857         } else {
858                 return -1
859         }
860 }
861
862 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
863         if t.pieceComplete(piece) {
864                 return false
865         }
866         if t.pieceAllDirty(piece) {
867                 return false
868         }
869         return t.pieces[piece].hasDirtyChunks()
870 }
871
872 func (t *Torrent) usualPieceSize() int {
873         return int(t.info.PieceLength)
874 }
875
876 func (t *Torrent) numPieces() pieceIndex {
877         return t.info.NumPieces()
878 }
879
880 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
881         return pieceIndex(t._completedPieces.GetCardinality())
882 }
883
884 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
885         if !t.closed.Set() {
886                 err = errors.New("already closed")
887                 return
888         }
889         for _, f := range t.onClose {
890                 f()
891         }
892         if t.storage != nil {
893                 wg.Add(1)
894                 go func() {
895                         defer wg.Done()
896                         t.storageLock.Lock()
897                         defer t.storageLock.Unlock()
898                         if f := t.storage.Close; f != nil {
899                                 err1 := f()
900                                 if err1 != nil {
901                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
902                                 }
903                         }
904                 }()
905         }
906         t.iterPeers(func(p *Peer) {
907                 p.close()
908         })
909         if t.storage != nil {
910                 t.deletePieceRequestOrder()
911         }
912         t.assertAllPiecesRelativeAvailabilityZero()
913         t.pex.Reset()
914         t.cl.event.Broadcast()
915         t.pieceStateChanges.Close()
916         t.updateWantPeersEvent()
917         return
918 }
919
920 func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
921         for i := range t.pieces {
922                 p := t.piece(i)
923                 if p.relativeAvailability != 0 {
924                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
925                 }
926         }
927 }
928
929 func (t *Torrent) requestOffset(r Request) int64 {
930         return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
931 }
932
933 // Return the request that would include the given offset into the torrent data. Returns !ok if
934 // there is no such request.
935 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
936         return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
937 }
938
939 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
940         defer perf.ScopeTimerErr(&err)()
941         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
942         if err == nil && n != len(data) {
943                 err = io.ErrShortWrite
944         }
945         return err
946 }
947
948 func (t *Torrent) bitfield() (bf []bool) {
949         bf = make([]bool, t.numPieces())
950         t._completedPieces.Iterate(func(piece uint32) (again bool) {
951                 bf[piece] = true
952                 return true
953         })
954         return
955 }
956
957 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
958         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
959 }
960
961 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
962         return t._chunksPerRegularPiece
963 }
964
965 func (t *Torrent) numChunks() RequestIndex {
966         if t.numPieces() == 0 {
967                 return 0
968         }
969         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
970 }
971
972 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
973         t.dirtyChunks.RemoveRange(
974                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
975                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
976 }
977
978 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
979         if t.info.PieceLength == 0 {
980                 // There will be no variance amongst pieces. Only pain.
981                 return 0
982         }
983         if piece == t.numPieces()-1 {
984                 ret := pp.Integer(t.length() % t.info.PieceLength)
985                 if ret != 0 {
986                         return ret
987                 }
988         }
989         return pp.Integer(t.info.PieceLength)
990 }
991
992 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
993         return &blockCheckingWriter{
994                 cache:        &t.smartBanCache,
995                 requestIndex: t.pieceRequestIndexOffset(piece),
996                 chunkSize:    t.chunkSize.Int(),
997         }
998 }
999
1000 func (t *Torrent) hashPiece(piece pieceIndex) (
1001         ret metainfo.Hash,
1002         // These are peers that sent us blocks that differ from what we hash here.
1003         differingPeers map[bannableAddr]struct{},
1004         err error,
1005 ) {
1006         p := t.piece(piece)
1007         p.waitNoPendingWrites()
1008         storagePiece := t.pieces[piece].Storage()
1009
1010         // Does the backend want to do its own hashing?
1011         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
1012                 var sum metainfo.Hash
1013                 // log.Printf("A piece decided to self-hash: %d", piece)
1014                 sum, err = i.SelfHash()
1015                 missinggo.CopyExact(&ret, sum)
1016                 return
1017         }
1018
1019         hash := pieceHash.New()
1020         const logPieceContents = false
1021         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
1022         writers := []io.Writer{hash, smartBanWriter}
1023         var examineBuf bytes.Buffer
1024         if logPieceContents {
1025                 writers = append(writers, &examineBuf)
1026         }
1027         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
1028         if logPieceContents {
1029                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
1030         }
1031         smartBanWriter.Flush()
1032         differingPeers = smartBanWriter.badPeers
1033         missinggo.CopyExact(&ret, hash.Sum(nil))
1034         return
1035 }
1036
1037 func (t *Torrent) haveAnyPieces() bool {
1038         return !t._completedPieces.IsEmpty()
1039 }
1040
1041 func (t *Torrent) haveAllPieces() bool {
1042         if !t.haveInfo() {
1043                 return false
1044         }
1045         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1046 }
1047
1048 func (t *Torrent) havePiece(index pieceIndex) bool {
1049         return t.haveInfo() && t.pieceComplete(index)
1050 }
1051
1052 func (t *Torrent) maybeDropMutuallyCompletePeer(
1053         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1054         // okay?
1055         p *Peer,
1056 ) {
1057         if !t.cl.config.DropMutuallyCompletePeers {
1058                 return
1059         }
1060         if !t.haveAllPieces() {
1061                 return
1062         }
1063         if all, known := p.peerHasAllPieces(); !(known && all) {
1064                 return
1065         }
1066         if p.useful() {
1067                 return
1068         }
1069         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
1070         p.drop()
1071 }
1072
1073 func (t *Torrent) haveChunk(r Request) (ret bool) {
1074         // defer func() {
1075         //      log.Println("have chunk", r, ret)
1076         // }()
1077         if !t.haveInfo() {
1078                 return false
1079         }
1080         if t.pieceComplete(pieceIndex(r.Index)) {
1081                 return true
1082         }
1083         p := &t.pieces[r.Index]
1084         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1085 }
1086
1087 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1088         return chunkIndexType(cs.Begin / chunkSize)
1089 }
1090
1091 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1092         return t._pendingPieces.Contains(uint32(index))
1093 }
1094
1095 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1096 // conns (which is a map).
1097 var peerConnSlices sync.Pool
1098
1099 func getPeerConnSlice(cap int) []*PeerConn {
1100         getInterface := peerConnSlices.Get()
1101         if getInterface == nil {
1102                 return make([]*PeerConn, 0, cap)
1103         } else {
1104                 return getInterface.([]*PeerConn)[:0]
1105         }
1106 }
1107
1108 // Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
1109 // this is a frequent occurrence.
1110 func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
1111         sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
1112         f(sl)
1113         peerConnSlices.Put(sl)
1114 }
1115
1116 func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
1117         wcs := worseConnSlice{conns: sl}
1118         wcs.initKeys(opts)
1119         heap.Init(&wcs)
1120         for wcs.Len() != 0 {
1121                 c := heap.Pop(&wcs).(*PeerConn)
1122                 if opts.incomingIsBad && !c.outgoing {
1123                         return c
1124                 }
1125                 if opts.outgoingIsBad && c.outgoing {
1126                         return c
1127                 }
1128                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1129                         return c
1130                 }
1131                 // If the connection is in the worst half of the established
1132                 // connection quota and is older than a minute.
1133                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1134                         // Give connections 1 minute to prove themselves.
1135                         if time.Since(c.completedHandshake) > time.Minute {
1136                                 return c
1137                         }
1138                 }
1139         }
1140         return nil
1141 }
1142
1143 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1144 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1145 // established connections for more than a minute. This is O(n log n). If there was a way to not
1146 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1147 func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
1148         t.withUnclosedConns(func(ucs []*PeerConn) {
1149                 ret = t.worstBadConnFromSlice(opts, ucs)
1150         })
1151         return
1152 }
1153
1154 type PieceStateChange struct {
1155         Index int
1156         PieceState
1157 }
1158
1159 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1160         t.cl._mu.Defer(func() {
1161                 cur := t.pieceState(piece)
1162                 p := &t.pieces[piece]
1163                 if cur != p.publicPieceState {
1164                         p.publicPieceState = cur
1165                         t.pieceStateChanges.Publish(PieceStateChange{
1166                                 int(piece),
1167                                 cur,
1168                         })
1169                 }
1170         })
1171 }
1172
1173 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1174         if t.pieceComplete(piece) {
1175                 return 0
1176         }
1177         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1178 }
1179
1180 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1181         return t.pieces[piece].allChunksDirty()
1182 }
1183
1184 func (t *Torrent) readersChanged() {
1185         t.updateReaderPieces()
1186         t.updateAllPiecePriorities("Torrent.readersChanged")
1187 }
1188
1189 func (t *Torrent) updateReaderPieces() {
1190         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1191 }
1192
1193 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1194         if from == to {
1195                 return
1196         }
1197         t.updateReaderPieces()
1198         // Order the ranges, high and low.
1199         l, h := from, to
1200         if l.begin > h.begin {
1201                 l, h = h, l
1202         }
1203         if l.end < h.begin {
1204                 // Two distinct ranges.
1205                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1206                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1207         } else {
1208                 // Ranges overlap.
1209                 end := l.end
1210                 if h.end > end {
1211                         end = h.end
1212                 }
1213                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1214         }
1215 }
1216
1217 func (t *Torrent) maybeNewConns() {
1218         // Tickle the accept routine.
1219         t.cl.event.Broadcast()
1220         t.openNewConns()
1221 }
1222
1223 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1224         if t._pendingPieces.Contains(uint32(piece)) {
1225                 t.iterPeers(func(c *Peer) {
1226                         // if c.requestState.Interested {
1227                         //      return
1228                         // }
1229                         if !c.isLowOnRequests() {
1230                                 return
1231                         }
1232                         if !c.peerHasPiece(piece) {
1233                                 return
1234                         }
1235                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1236                                 return
1237                         }
1238                         c.updateRequests(reason)
1239                 })
1240         }
1241         t.maybeNewConns()
1242         t.publishPieceChange(piece)
1243 }
1244
1245 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1246         if !t.closed.IsSet() {
1247                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1248                 // request order.
1249                 t.updatePieceRequestOrder(piece)
1250         }
1251         p := &t.pieces[piece]
1252         newPrio := p.uncachedPriority()
1253         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1254         if newPrio == PiecePriorityNone {
1255                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1256                         return
1257                 }
1258         } else {
1259                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1260                         return
1261                 }
1262         }
1263         t.piecePriorityChanged(piece, reason)
1264 }
1265
1266 func (t *Torrent) updateAllPiecePriorities(reason string) {
1267         t.updatePiecePriorities(0, t.numPieces(), reason)
1268 }
1269
1270 // Update all piece priorities in one hit. This function should have the same
1271 // output as updatePiecePriority, but across all pieces.
1272 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1273         for i := begin; i < end; i++ {
1274                 t.updatePiecePriority(i, reason)
1275         }
1276 }
1277
1278 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1279 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1280         if off >= t.length() {
1281                 return
1282         }
1283         if off < 0 {
1284                 size += off
1285                 off = 0
1286         }
1287         if size <= 0 {
1288                 return
1289         }
1290         begin = pieceIndex(off / t.info.PieceLength)
1291         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1292         if end > pieceIndex(t.info.NumPieces()) {
1293                 end = pieceIndex(t.info.NumPieces())
1294         }
1295         return
1296 }
1297
1298 // Returns true if all iterations complete without breaking. Returns the read regions for all
1299 // readers. The reader regions should not be merged as some callers depend on this method to
1300 // enumerate readers.
1301 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1302         for r := range t.readers {
1303                 p := r.pieces
1304                 if p.begin >= p.end {
1305                         continue
1306                 }
1307                 if !f(p.begin, p.end) {
1308                         return false
1309                 }
1310         }
1311         return true
1312 }
1313
1314 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1315         return t.piece(piece).uncachedPriority()
1316 }
1317
1318 func (t *Torrent) pendRequest(req RequestIndex) {
1319         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1320 }
1321
1322 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1323         t.cl.event.Broadcast()
1324         if t.pieceComplete(piece) {
1325                 t.onPieceCompleted(piece)
1326         } else {
1327                 t.onIncompletePiece(piece)
1328         }
1329         t.updatePiecePriority(piece, reason)
1330 }
1331
1332 func (t *Torrent) numReceivedConns() (ret int) {
1333         for c := range t.conns {
1334                 if c.Discovery == PeerSourceIncoming {
1335                         ret++
1336                 }
1337         }
1338         return
1339 }
1340
1341 func (t *Torrent) numOutgoingConns() (ret int) {
1342         for c := range t.conns {
1343                 if c.outgoing {
1344                         ret++
1345                 }
1346         }
1347         return
1348 }
1349
1350 func (t *Torrent) maxHalfOpen() int {
1351         // Note that if we somehow exceed the maximum established conns, we want
1352         // the negative value to have an effect.
1353         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1354         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1355         // We want to allow some experimentation with new peers, and to try to
1356         // upset an oversupply of received connections.
1357         return int(min(
1358                 max(5, extraIncoming)+establishedHeadroom,
1359                 int64(t.cl.config.HalfOpenConnsPerTorrent),
1360         ))
1361 }
1362
1363 func (t *Torrent) openNewConns() (initiated int) {
1364         defer t.updateWantPeersEvent()
1365         for t.peers.Len() != 0 {
1366                 if !t.wantOutgoingConns() {
1367                         return
1368                 }
1369                 if len(t.halfOpen) >= t.maxHalfOpen() {
1370                         return
1371                 }
1372                 if len(t.cl.dialers) == 0 {
1373                         return
1374                 }
1375                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1376                         return
1377                 }
1378                 p := t.peers.PopMax()
1379                 t.initiateConn(p)
1380                 initiated++
1381         }
1382         return
1383 }
1384
1385 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1386         p := t.piece(piece)
1387         uncached := t.pieceCompleteUncached(piece)
1388         cached := p.completion()
1389         changed := cached != uncached
1390         complete := uncached.Complete
1391         p.storageCompletionOk = uncached.Ok
1392         x := uint32(piece)
1393         if complete {
1394                 t._completedPieces.Add(x)
1395                 t.openNewConns()
1396         } else {
1397                 t._completedPieces.Remove(x)
1398         }
1399         p.t.updatePieceRequestOrder(piece)
1400         t.updateComplete()
1401         if complete && len(p.dirtiers) != 0 {
1402                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1403         }
1404         if changed {
1405                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
1406                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1407         }
1408         return changed
1409 }
1410
1411 // Non-blocking read. Client lock is not required.
1412 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1413         for len(b) != 0 {
1414                 p := &t.pieces[off/t.info.PieceLength]
1415                 p.waitNoPendingWrites()
1416                 var n1 int
1417                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1418                 if n1 == 0 {
1419                         break
1420                 }
1421                 off += int64(n1)
1422                 n += n1
1423                 b = b[n1:]
1424         }
1425         return
1426 }
1427
1428 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1429 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1430 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1431 func (t *Torrent) maybeCompleteMetadata() error {
1432         if t.haveInfo() {
1433                 // Nothing to do.
1434                 return nil
1435         }
1436         if !t.haveAllMetadataPieces() {
1437                 // Don't have enough metadata pieces.
1438                 return nil
1439         }
1440         err := t.setInfoBytesLocked(t.metadataBytes)
1441         if err != nil {
1442                 t.invalidateMetadata()
1443                 return fmt.Errorf("error setting info bytes: %s", err)
1444         }
1445         if t.cl.config.Debug {
1446                 t.logger.Printf("%s: got metadata from peers", t)
1447         }
1448         return nil
1449 }
1450
1451 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1452         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1453                 if end > begin {
1454                         now.Add(bitmap.BitIndex(begin))
1455                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1456                 }
1457                 return true
1458         })
1459         return
1460 }
1461
1462 func (t *Torrent) needData() bool {
1463         if t.closed.IsSet() {
1464                 return false
1465         }
1466         if !t.haveInfo() {
1467                 return true
1468         }
1469         return !t._pendingPieces.IsEmpty()
1470 }
1471
1472 func appendMissingStrings(old, new []string) (ret []string) {
1473         ret = old
1474 new:
1475         for _, n := range new {
1476                 for _, o := range old {
1477                         if o == n {
1478                                 continue new
1479                         }
1480                 }
1481                 ret = append(ret, n)
1482         }
1483         return
1484 }
1485
1486 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1487         ret = existing
1488         for minNumTiers > len(ret) {
1489                 ret = append(ret, nil)
1490         }
1491         return
1492 }
1493
1494 func (t *Torrent) addTrackers(announceList [][]string) {
1495         fullAnnounceList := &t.metainfo.AnnounceList
1496         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1497         for tierIndex, trackerURLs := range announceList {
1498                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1499         }
1500         t.startMissingTrackerScrapers()
1501         t.updateWantPeersEvent()
1502 }
1503
1504 // Don't call this before the info is available.
1505 func (t *Torrent) bytesCompleted() int64 {
1506         if !t.haveInfo() {
1507                 return 0
1508         }
1509         return t.length() - t.bytesLeft()
1510 }
1511
1512 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1513         t.cl.lock()
1514         defer t.cl.unlock()
1515         return t.setInfoBytesLocked(b)
1516 }
1517
1518 // Returns true if connection is removed from torrent.Conns.
1519 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1520         if !c.closed.IsSet() {
1521                 panic("connection is not closed")
1522                 // There are behaviours prevented by the closed state that will fail
1523                 // if the connection has been deleted.
1524         }
1525         _, ret = t.conns[c]
1526         delete(t.conns, c)
1527         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1528         // the drop event against the PexConnState instead.
1529         if ret {
1530                 if !t.cl.config.DisablePEX {
1531                         t.pex.Drop(c)
1532                 }
1533         }
1534         torrent.Add("deleted connections", 1)
1535         c.deleteAllRequests("Torrent.deletePeerConn")
1536         t.assertPendingRequests()
1537         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1538                 panic(t.connsWithAllPieces)
1539         }
1540         return
1541 }
1542
1543 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1544         if t.deleteConnWithAllPieces(p) {
1545                 return
1546         }
1547         if !t.haveInfo() {
1548                 return
1549         }
1550         p.peerPieces().Iterate(func(i uint32) bool {
1551                 p.t.decPieceAvailability(pieceIndex(i))
1552                 return true
1553         })
1554 }
1555
1556 func (t *Torrent) assertPendingRequests() {
1557         if !check {
1558                 return
1559         }
1560         // var actual pendingRequests
1561         // if t.haveInfo() {
1562         //      actual.m = make([]int, t.numChunks())
1563         // }
1564         // t.iterPeers(func(p *Peer) {
1565         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1566         //              actual.Inc(x)
1567         //              return true
1568         //      })
1569         // })
1570         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1571         // if diff != "" {
1572         //      panic(diff)
1573         // }
1574 }
1575
1576 func (t *Torrent) dropConnection(c *PeerConn) {
1577         t.cl.event.Broadcast()
1578         c.close()
1579         if t.deletePeerConn(c) {
1580                 t.openNewConns()
1581         }
1582 }
1583
1584 // Peers as in contact information for dialing out.
1585 func (t *Torrent) wantPeers() bool {
1586         if t.closed.IsSet() {
1587                 return false
1588         }
1589         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1590                 return false
1591         }
1592         return t.wantOutgoingConns()
1593 }
1594
1595 func (t *Torrent) updateWantPeersEvent() {
1596         if t.wantPeers() {
1597                 t.wantPeersEvent.Set()
1598         } else {
1599                 t.wantPeersEvent.Clear()
1600         }
1601 }
1602
1603 // Returns whether the client should make effort to seed the torrent.
1604 func (t *Torrent) seeding() bool {
1605         cl := t.cl
1606         if t.closed.IsSet() {
1607                 return false
1608         }
1609         if t.dataUploadDisallowed {
1610                 return false
1611         }
1612         if cl.config.NoUpload {
1613                 return false
1614         }
1615         if !cl.config.Seed {
1616                 return false
1617         }
1618         if cl.config.DisableAggressiveUpload && t.needData() {
1619                 return false
1620         }
1621         return true
1622 }
1623
1624 func (t *Torrent) onWebRtcConn(
1625         c datachannel.ReadWriteCloser,
1626         dcc webtorrent.DataChannelContext,
1627 ) {
1628         defer c.Close()
1629         netConn := webrtcNetConn{
1630                 ReadWriteCloser:    c,
1631                 DataChannelContext: dcc,
1632         }
1633         peerRemoteAddr := netConn.RemoteAddr()
1634         //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
1635         if t.cl.badPeerAddr(peerRemoteAddr) {
1636                 return
1637         }
1638         localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
1639         pc, err := t.cl.initiateProtocolHandshakes(
1640                 context.Background(),
1641                 netConn,
1642                 t,
1643                 false,
1644                 newConnectionOpts{
1645                         outgoing:        dcc.LocalOffered,
1646                         remoteAddr:      peerRemoteAddr,
1647                         localPublicAddr: localAddrIpPort,
1648                         network:         webrtcNetwork,
1649                         connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1650                 },
1651         )
1652         if err != nil {
1653                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1654                 return
1655         }
1656         if dcc.LocalOffered {
1657                 pc.Discovery = PeerSourceTracker
1658         } else {
1659                 pc.Discovery = PeerSourceIncoming
1660         }
1661         pc.conn.SetWriteDeadline(time.Time{})
1662         t.cl.lock()
1663         defer t.cl.unlock()
1664         err = t.cl.runHandshookConn(pc, t)
1665         if err != nil {
1666                 t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
1667         }
1668 }
1669
1670 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1671         err := t.cl.runHandshookConn(pc, t)
1672         if err != nil || logAll {
1673                 t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
1674         }
1675 }
1676
1677 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1678         t.logRunHandshookConn(pc, false, log.Debug)
1679 }
1680
1681 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1682         wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
1683         // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
1684         // the same info hash before the old one is cleaned up.
1685         t.onClose = append(t.onClose, release)
1686         wst := websocketTrackerStatus{u, wtc}
1687         go func() {
1688                 err := wtc.Announce(tracker.Started, t.infoHash)
1689                 if err != nil {
1690                         t.logger.WithDefaultLevel(log.Warning).Printf(
1691                                 "error in initial announce to %q: %v",
1692                                 u.String(), err,
1693                         )
1694                 }
1695         }()
1696         return wst
1697 }
1698
1699 func (t *Torrent) startScrapingTracker(_url string) {
1700         if _url == "" {
1701                 return
1702         }
1703         u, err := url.Parse(_url)
1704         if err != nil {
1705                 // URLs with a leading '*' appear to be a uTorrent convention to
1706                 // disable trackers.
1707                 if _url[0] != '*' {
1708                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1709                 }
1710                 return
1711         }
1712         if u.Scheme == "udp" {
1713                 u.Scheme = "udp4"
1714                 t.startScrapingTracker(u.String())
1715                 u.Scheme = "udp6"
1716                 t.startScrapingTracker(u.String())
1717                 return
1718         }
1719         if _, ok := t.trackerAnnouncers[_url]; ok {
1720                 return
1721         }
1722         sl := func() torrentTrackerAnnouncer {
1723                 switch u.Scheme {
1724                 case "ws", "wss":
1725                         if t.cl.config.DisableWebtorrent {
1726                                 return nil
1727                         }
1728                         return t.startWebsocketAnnouncer(*u)
1729                 case "udp4":
1730                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1731                                 return nil
1732                         }
1733                 case "udp6":
1734                         if t.cl.config.DisableIPv6 {
1735                                 return nil
1736                         }
1737                 }
1738                 newAnnouncer := &trackerScraper{
1739                         u:               *u,
1740                         t:               t,
1741                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1742                 }
1743                 go newAnnouncer.Run()
1744                 return newAnnouncer
1745         }()
1746         if sl == nil {
1747                 return
1748         }
1749         if t.trackerAnnouncers == nil {
1750                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1751         }
1752         t.trackerAnnouncers[_url] = sl
1753 }
1754
1755 // Adds and starts tracker scrapers for tracker URLs that aren't already
1756 // running.
1757 func (t *Torrent) startMissingTrackerScrapers() {
1758         if t.cl.config.DisableTrackers {
1759                 return
1760         }
1761         t.startScrapingTracker(t.metainfo.Announce)
1762         for _, tier := range t.metainfo.AnnounceList {
1763                 for _, url := range tier {
1764                         t.startScrapingTracker(url)
1765                 }
1766         }
1767 }
1768
1769 // Returns an AnnounceRequest with fields filled out to defaults and current
1770 // values.
1771 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1772         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1773         // dependent on the network in use.
1774         return tracker.AnnounceRequest{
1775                 Event: event,
1776                 NumWant: func() int32 {
1777                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1778                                 return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
1779                         } else {
1780                                 return 0
1781                         }
1782                 }(),
1783                 Port:     uint16(t.cl.incomingPeerPort()),
1784                 PeerId:   t.cl.peerID,
1785                 InfoHash: t.infoHash,
1786                 Key:      t.cl.announceKey(),
1787
1788                 // The following are vaguely described in BEP 3.
1789
1790                 Left:     t.bytesLeftAnnounce(),
1791                 Uploaded: t.stats.BytesWrittenData.Int64(),
1792                 // There's no mention of wasted or unwanted download in the BEP.
1793                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1794         }
1795 }
1796
1797 // Adds peers revealed in an announce until the announce ends, or we have
1798 // enough peers.
1799 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1800         cl := t.cl
1801         for v := range pvs {
1802                 cl.lock()
1803                 added := 0
1804                 for _, cp := range v.Peers {
1805                         if cp.Port == 0 {
1806                                 // Can't do anything with this.
1807                                 continue
1808                         }
1809                         if t.addPeer(PeerInfo{
1810                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1811                                 Source: PeerSourceDhtGetPeers,
1812                         }) {
1813                                 added++
1814                         }
1815                 }
1816                 cl.unlock()
1817                 // if added != 0 {
1818                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1819                 // }
1820         }
1821 }
1822
1823 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1824 // announce ends. stop will force the announce to end.
1825 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1826         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1827         if err != nil {
1828                 return
1829         }
1830         _done := make(chan struct{})
1831         done = _done
1832         stop = ps.Close
1833         go func() {
1834                 t.consumeDhtAnnouncePeers(ps.Peers())
1835                 close(_done)
1836         }()
1837         return
1838 }
1839
1840 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1841         _, stop, err := t.AnnounceToDht(s)
1842         if err != nil {
1843                 return err
1844         }
1845         select {
1846         case <-t.closed.Done():
1847         case <-time.After(5 * time.Minute):
1848         }
1849         stop()
1850         return nil
1851 }
1852
1853 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1854         cl := t.cl
1855         cl.lock()
1856         defer cl.unlock()
1857         for {
1858                 for {
1859                         if t.closed.IsSet() {
1860                                 return
1861                         }
1862                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1863                         // TODO: We can include the announce_peer step depending on whether we can receive
1864                         // inbound connections. We should probably only announce once every 15 mins too.
1865                         if !t.wantAnyConns() {
1866                                 goto wait
1867                         }
1868                         // TODO: Determine if there's a listener on the port we're announcing.
1869                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1870                                 goto wait
1871                         }
1872                         break
1873                 wait:
1874                         cl.event.Wait()
1875                 }
1876                 func() {
1877                         t.numDHTAnnounces++
1878                         cl.unlock()
1879                         defer cl.lock()
1880                         err := t.timeboxedAnnounceToDht(s)
1881                         if err != nil {
1882                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1883                         }
1884                 }()
1885         }
1886 }
1887
1888 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1889         for _, p := range peers {
1890                 if t.addPeer(p) {
1891                         added++
1892                 }
1893         }
1894         return
1895 }
1896
1897 // The returned TorrentStats may require alignment in memory. See
1898 // https://github.com/anacrolix/torrent/issues/383.
1899 func (t *Torrent) Stats() TorrentStats {
1900         t.cl.rLock()
1901         defer t.cl.rUnlock()
1902         return t.statsLocked()
1903 }
1904
1905 func (t *Torrent) statsLocked() (ret TorrentStats) {
1906         ret.ActivePeers = len(t.conns)
1907         ret.HalfOpenPeers = len(t.halfOpen)
1908         ret.PendingPeers = t.peers.Len()
1909         ret.TotalPeers = t.numTotalPeers()
1910         ret.ConnectedSeeders = 0
1911         for c := range t.conns {
1912                 if all, ok := c.peerHasAllPieces(); all && ok {
1913                         ret.ConnectedSeeders++
1914                 }
1915         }
1916         ret.ConnStats = t.stats.Copy()
1917         ret.PiecesComplete = t.numPiecesCompleted()
1918         return
1919 }
1920
1921 // The total number of peers in the torrent.
1922 func (t *Torrent) numTotalPeers() int {
1923         peers := make(map[string]struct{})
1924         for conn := range t.conns {
1925                 ra := conn.conn.RemoteAddr()
1926                 if ra == nil {
1927                         // It's been closed and doesn't support RemoteAddr.
1928                         continue
1929                 }
1930                 peers[ra.String()] = struct{}{}
1931         }
1932         for addr := range t.halfOpen {
1933                 peers[addr] = struct{}{}
1934         }
1935         t.peers.Each(func(peer PeerInfo) {
1936                 peers[peer.Addr.String()] = struct{}{}
1937         })
1938         return len(peers)
1939 }
1940
1941 // Reconcile bytes transferred before connection was associated with a
1942 // torrent.
1943 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1944         if c._stats != (ConnStats{
1945                 // Handshakes should only increment these fields:
1946                 BytesWritten: c._stats.BytesWritten,
1947                 BytesRead:    c._stats.BytesRead,
1948         }) {
1949                 panic("bad stats")
1950         }
1951         c.postHandshakeStats(func(cs *ConnStats) {
1952                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1953                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1954         })
1955         c.reconciledHandshakeStats = true
1956 }
1957
1958 // Returns true if the connection is added.
1959 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1960         defer func() {
1961                 if err == nil {
1962                         torrent.Add("added connections", 1)
1963                 }
1964         }()
1965         if t.closed.IsSet() {
1966                 return errors.New("torrent closed")
1967         }
1968         for c0 := range t.conns {
1969                 if c.PeerID != c0.PeerID {
1970                         continue
1971                 }
1972                 if !t.cl.config.DropDuplicatePeerIds {
1973                         continue
1974                 }
1975                 if c.hasPreferredNetworkOver(c0) {
1976                         c0.close()
1977                         t.deletePeerConn(c0)
1978                 } else {
1979                         return errors.New("existing connection preferred")
1980                 }
1981         }
1982         if len(t.conns) >= t.maxEstablishedConns {
1983                 numOutgoing := t.numOutgoingConns()
1984                 numIncoming := len(t.conns) - numOutgoing
1985                 c := t.worstBadConn(worseConnLensOpts{
1986                         // We've already established that we have too many connections at this point, so we just
1987                         // need to match what kind we have too many of vs. what we're trying to add now.
1988                         incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
1989                         outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
1990                 })
1991                 if c == nil {
1992                         return errors.New("don't want conn")
1993                 }
1994                 c.close()
1995                 t.deletePeerConn(c)
1996         }
1997         if len(t.conns) >= t.maxEstablishedConns {
1998                 panic(len(t.conns))
1999         }
2000         t.conns[c] = struct{}{}
2001         t.cl.event.Broadcast()
2002         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
2003                 t.pex.Add(c) // as no further extended handshake expected
2004         }
2005         return nil
2006 }
2007
2008 func (t *Torrent) newConnsAllowed() bool {
2009         if !t.networkingEnabled.Bool() {
2010                 return false
2011         }
2012         if t.closed.IsSet() {
2013                 return false
2014         }
2015         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2016                 return false
2017         }
2018         return true
2019 }
2020
2021 func (t *Torrent) wantAnyConns() bool {
2022         if !t.networkingEnabled.Bool() {
2023                 return false
2024         }
2025         if t.closed.IsSet() {
2026                 return false
2027         }
2028         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
2029                 return false
2030         }
2031         return len(t.conns) < t.maxEstablishedConns
2032 }
2033
2034 func (t *Torrent) wantOutgoingConns() bool {
2035         if !t.newConnsAllowed() {
2036                 return false
2037         }
2038         if len(t.conns) < t.maxEstablishedConns {
2039                 return true
2040         }
2041         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2042         return t.worstBadConn(worseConnLensOpts{
2043                 incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
2044                 outgoingIsBad: false,
2045         }) != nil
2046 }
2047
2048 func (t *Torrent) wantIncomingConns() bool {
2049         if !t.newConnsAllowed() {
2050                 return false
2051         }
2052         if len(t.conns) < t.maxEstablishedConns {
2053                 return true
2054         }
2055         numIncomingConns := len(t.conns) - t.numOutgoingConns()
2056         return t.worstBadConn(worseConnLensOpts{
2057                 incomingIsBad: false,
2058                 outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
2059         }) != nil
2060 }
2061
2062 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
2063         t.cl.lock()
2064         defer t.cl.unlock()
2065         oldMax = t.maxEstablishedConns
2066         t.maxEstablishedConns = max
2067         wcs := worseConnSlice{
2068                 conns: t.appendConns(nil, func(*PeerConn) bool {
2069                         return true
2070                 }),
2071         }
2072         wcs.initKeys(worseConnLensOpts{})
2073         heap.Init(&wcs)
2074         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
2075                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
2076         }
2077         t.openNewConns()
2078         return oldMax
2079 }
2080
2081 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
2082         t.logger.LazyLog(log.Debug, func() log.Msg {
2083                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
2084         })
2085         p := t.piece(piece)
2086         p.numVerifies++
2087         t.cl.event.Broadcast()
2088         if t.closed.IsSet() {
2089                 return
2090         }
2091
2092         // Don't score the first time a piece is hashed, it could be an initial check.
2093         if p.storageCompletionOk {
2094                 if passed {
2095                         pieceHashedCorrect.Add(1)
2096                 } else {
2097                         log.Fmsg(
2098                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
2099                         ).AddValues(t, p).LogLevel(
2100
2101                                 log.Debug, t.logger)
2102
2103                         pieceHashedNotCorrect.Add(1)
2104                 }
2105         }
2106
2107         p.marking = true
2108         t.publishPieceChange(piece)
2109         defer func() {
2110                 p.marking = false
2111                 t.publishPieceChange(piece)
2112         }()
2113
2114         if passed {
2115                 if len(p.dirtiers) != 0 {
2116                         // Don't increment stats above connection-level for every involved connection.
2117                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
2118                 }
2119                 for c := range p.dirtiers {
2120                         c._stats.incrementPiecesDirtiedGood()
2121                 }
2122                 t.clearPieceTouchers(piece)
2123                 hasDirty := p.hasDirtyChunks()
2124                 t.cl.unlock()
2125                 if hasDirty {
2126                         p.Flush() // You can be synchronous here!
2127                 }
2128                 err := p.Storage().MarkComplete()
2129                 if err != nil {
2130                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
2131                 }
2132                 t.cl.lock()
2133
2134                 if t.closed.IsSet() {
2135                         return
2136                 }
2137                 t.pendAllChunkSpecs(piece)
2138         } else {
2139                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2140                         // Peers contributed to all the data for this piece hash failure, and the failure was
2141                         // not due to errors in the storage (such as data being dropped in a cache).
2142
2143                         // Increment Torrent and above stats, and then specific connections.
2144                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2145                         for c := range p.dirtiers {
2146                                 // Y u do dis peer?!
2147                                 c.stats().incrementPiecesDirtiedBad()
2148                         }
2149
2150                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2151                         for c := range p.dirtiers {
2152                                 if !c.trusted {
2153                                         bannableTouchers = append(bannableTouchers, c)
2154                                 }
2155                         }
2156                         t.clearPieceTouchers(piece)
2157                         slices.Sort(bannableTouchers, connLessTrusted)
2158
2159                         if t.cl.config.Debug {
2160                                 t.logger.Printf(
2161                                         "bannable conns by trust for piece %d: %v",
2162                                         piece,
2163                                         func() (ret []connectionTrust) {
2164                                                 for _, c := range bannableTouchers {
2165                                                         ret = append(ret, c.trust())
2166                                                 }
2167                                                 return
2168                                         }(),
2169                                 )
2170                         }
2171
2172                         if len(bannableTouchers) >= 1 {
2173                                 c := bannableTouchers[0]
2174                                 if len(bannableTouchers) != 1 {
2175                                         t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2176                                 } else {
2177                                         // Turns out it's still useful to ban peers like this because if there's only a
2178                                         // single peer for a piece, and we never progress that piece to completion, we
2179                                         // will never smart-ban them. Discovered in
2180                                         // https://github.com/anacrolix/torrent/issues/715.
2181                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2182                                         c.ban()
2183                                 }
2184                         }
2185                 }
2186                 t.onIncompletePiece(piece)
2187                 p.Storage().MarkNotComplete()
2188         }
2189         t.updatePieceCompletion(piece)
2190 }
2191
2192 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2193         start := t.pieceRequestIndexOffset(piece)
2194         end := start + t.pieceNumChunks(piece)
2195         for ri := start; ri < end; ri++ {
2196                 t.cancelRequest(ri)
2197         }
2198 }
2199
2200 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2201         t.pendAllChunkSpecs(piece)
2202         t.cancelRequestsForPiece(piece)
2203         t.piece(piece).readerCond.Broadcast()
2204         for conn := range t.conns {
2205                 conn.have(piece)
2206                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
2207         }
2208 }
2209
2210 // Called when a piece is found to be not complete.
2211 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2212         if t.pieceAllDirty(piece) {
2213                 t.pendAllChunkSpecs(piece)
2214         }
2215         if !t.wantPieceIndex(piece) {
2216                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2217                 return
2218         }
2219         // We could drop any connections that we told we have a piece that we
2220         // don't here. But there's a test failure, and it seems clients don't care
2221         // if you request pieces that you already claim to have. Pruning bad
2222         // connections might just remove any connections that aren't treating us
2223         // favourably anyway.
2224
2225         // for c := range t.conns {
2226         //      if c.sentHave(piece) {
2227         //              c.drop()
2228         //      }
2229         // }
2230         t.iterPeers(func(conn *Peer) {
2231                 if conn.peerHasPiece(piece) {
2232                         conn.updateRequests("piece incomplete")
2233                 }
2234         })
2235 }
2236
2237 func (t *Torrent) tryCreateMorePieceHashers() {
2238         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2239         }
2240 }
2241
2242 func (t *Torrent) tryCreatePieceHasher() bool {
2243         if t.storage == nil {
2244                 return false
2245         }
2246         pi, ok := t.getPieceToHash()
2247         if !ok {
2248                 return false
2249         }
2250         p := t.piece(pi)
2251         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2252         p.hashing = true
2253         t.publishPieceChange(pi)
2254         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2255         t.storageLock.RLock()
2256         t.activePieceHashes++
2257         go t.pieceHasher(pi)
2258         return true
2259 }
2260
2261 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2262         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2263                 if t.piece(i).hashing {
2264                         return true
2265                 }
2266                 ret = i
2267                 ok = true
2268                 return false
2269         })
2270         return
2271 }
2272
2273 func (t *Torrent) dropBannedPeers() {
2274         t.iterPeers(func(p *Peer) {
2275                 remoteIp := p.remoteIp()
2276                 if remoteIp == nil {
2277                         if p.bannableAddr.Ok {
2278                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2279                         }
2280                         return
2281                 }
2282                 netipAddr := netip.MustParseAddr(remoteIp.String())
2283                 if Some(netipAddr) != p.bannableAddr {
2284                         t.logger.WithDefaultLevel(log.Debug).Printf(
2285                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2286                                 p, remoteIp, p.bannableAddr)
2287                 }
2288                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2289                         // Should this be a close?
2290                         p.drop()
2291                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2292                 }
2293         })
2294 }
2295
2296 func (t *Torrent) pieceHasher(index pieceIndex) {
2297         p := t.piece(index)
2298         sum, failedPeers, copyErr := t.hashPiece(index)
2299         correct := sum == *p.hash
2300         switch copyErr {
2301         case nil, io.EOF:
2302         default:
2303                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2304         }
2305         t.storageLock.RUnlock()
2306         t.cl.lock()
2307         defer t.cl.unlock()
2308         if correct {
2309                 for peer := range failedPeers {
2310                         t.cl.banPeerIP(peer.AsSlice())
2311                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2312                 }
2313                 t.dropBannedPeers()
2314                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2315                         t.smartBanCache.ForgetBlock(ri)
2316                 }
2317         }
2318         p.hashing = false
2319         t.pieceHashed(index, correct, copyErr)
2320         t.updatePiecePriority(index, "Torrent.pieceHasher")
2321         t.activePieceHashes--
2322         t.tryCreateMorePieceHashers()
2323 }
2324
2325 // Return the connections that touched a piece, and clear the entries while doing it.
2326 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2327         p := t.piece(pi)
2328         for c := range p.dirtiers {
2329                 delete(c.peerTouchedPieces, pi)
2330                 delete(p.dirtiers, c)
2331         }
2332 }
2333
2334 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2335         t.iterPeers(func(p *Peer) {
2336                 ret = append(ret, p)
2337         })
2338         return
2339 }
2340
2341 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2342         piece := t.piece(pieceIndex)
2343         if piece.queuedForHash() {
2344                 return
2345         }
2346         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2347         t.publishPieceChange(pieceIndex)
2348         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2349         t.tryCreateMorePieceHashers()
2350 }
2351
2352 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2353 // before the Info is available.
2354 func (t *Torrent) VerifyData() {
2355         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2356                 t.Piece(i).VerifyData()
2357         }
2358 }
2359
2360 // Start the process of connecting to the given peer for the given torrent if appropriate.
2361 func (t *Torrent) initiateConn(peer PeerInfo) {
2362         if peer.Id == t.cl.peerID {
2363                 return
2364         }
2365         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2366                 return
2367         }
2368         addr := peer.Addr
2369         if t.addrActive(addr.String()) {
2370                 return
2371         }
2372         t.cl.numHalfOpen++
2373         t.halfOpen[addr.String()] = peer
2374         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2375 }
2376
2377 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2378 // quickly make one Client visible to the Torrent of another Client.
2379 func (t *Torrent) AddClientPeer(cl *Client) int {
2380         return t.AddPeers(func() (ps []PeerInfo) {
2381                 for _, la := range cl.ListenAddrs() {
2382                         ps = append(ps, PeerInfo{
2383                                 Addr:    la,
2384                                 Trusted: true,
2385                         })
2386                 }
2387                 return
2388         }())
2389 }
2390
2391 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2392 // connection.
2393 func (t *Torrent) allStats(f func(*ConnStats)) {
2394         f(&t.stats)
2395         f(&t.cl.stats)
2396 }
2397
2398 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2399         return t.pieces[i].hashing
2400 }
2401
2402 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2403         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2404 }
2405
2406 func (t *Torrent) dialTimeout() time.Duration {
2407         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2408 }
2409
2410 func (t *Torrent) piece(i int) *Piece {
2411         return &t.pieces[i]
2412 }
2413
2414 func (t *Torrent) onWriteChunkErr(err error) {
2415         if t.userOnWriteChunkErr != nil {
2416                 go t.userOnWriteChunkErr(err)
2417                 return
2418         }
2419         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2420         t.disallowDataDownloadLocked()
2421 }
2422
2423 func (t *Torrent) DisallowDataDownload() {
2424         t.disallowDataDownloadLocked()
2425 }
2426
2427 func (t *Torrent) disallowDataDownloadLocked() {
2428         t.dataDownloadDisallowed.Set()
2429 }
2430
2431 func (t *Torrent) AllowDataDownload() {
2432         t.dataDownloadDisallowed.Clear()
2433 }
2434
2435 // Enables uploading data, if it was disabled.
2436 func (t *Torrent) AllowDataUpload() {
2437         t.cl.lock()
2438         defer t.cl.unlock()
2439         t.dataUploadDisallowed = false
2440         for c := range t.conns {
2441                 c.updateRequests("allow data upload")
2442         }
2443 }
2444
2445 // Disables uploading data, if it was enabled.
2446 func (t *Torrent) DisallowDataUpload() {
2447         t.cl.lock()
2448         defer t.cl.unlock()
2449         t.dataUploadDisallowed = true
2450         for c := range t.conns {
2451                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2452                 c.updateRequests("disallow data upload")
2453         }
2454 }
2455
2456 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2457 // or if nil, a critical message is logged, and data download is disabled.
2458 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2459         t.cl.lock()
2460         defer t.cl.unlock()
2461         t.userOnWriteChunkErr = f
2462 }
2463
2464 func (t *Torrent) iterPeers(f func(p *Peer)) {
2465         for pc := range t.conns {
2466                 f(&pc.Peer)
2467         }
2468         for _, ws := range t.webSeeds {
2469                 f(ws)
2470         }
2471 }
2472
2473 func (t *Torrent) callbacks() *Callbacks {
2474         return &t.cl.config.Callbacks
2475 }
2476
2477 type AddWebSeedsOpt func(*webseed.Client)
2478
2479 // Sets the WebSeed trailing path escaper for a webseed.Client.
2480 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2481         return func(c *webseed.Client) {
2482                 c.PathEscaper = custom
2483         }
2484 }
2485
2486 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2487         t.cl.lock()
2488         defer t.cl.unlock()
2489         for _, u := range urls {
2490                 t.addWebSeed(u, opts...)
2491         }
2492 }
2493
2494 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2495         if t.cl.config.DisableWebseeds {
2496                 return
2497         }
2498         if _, ok := t.webSeeds[url]; ok {
2499                 return
2500         }
2501         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2502         // right away. This value should be some multiple of the number of connections to a host. I
2503         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2504         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2505         // "https://webtorrent.io/torrents/".
2506         const maxRequests = 16
2507         ws := webseedPeer{
2508                 peer: Peer{
2509                         t:                        t,
2510                         outgoing:                 true,
2511                         Network:                  "http",
2512                         reconciledHandshakeStats: true,
2513                         // This should affect how often we have to recompute requests for this peer. Note that
2514                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2515                         // requests mark more often, so recomputation is probably sooner than with regular peer
2516                         // conns. ~4x maxRequests would be about right.
2517                         PeerMaxRequests: 128,
2518                         // TODO: Set ban prefix?
2519                         RemoteAddr: remoteAddrFromUrl(url),
2520                         callbacks:  t.callbacks(),
2521                 },
2522                 client: webseed.Client{
2523                         HttpClient: t.cl.httpClient,
2524                         Url:        url,
2525                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2526                                 return &rateLimitedReader{
2527                                         l: t.cl.config.DownloadRateLimiter,
2528                                         r: r,
2529                                 }
2530                         },
2531                 },
2532                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2533         }
2534         ws.peer.initRequestState()
2535         for _, opt := range opts {
2536                 opt(&ws.client)
2537         }
2538         ws.peer.initUpdateRequestsTimer()
2539         ws.requesterCond.L = t.cl.locker()
2540         for i := 0; i < maxRequests; i += 1 {
2541                 go ws.requester(i)
2542         }
2543         for _, f := range t.callbacks().NewPeer {
2544                 f(&ws.peer)
2545         }
2546         ws.peer.logger = t.logger.WithContextValue(&ws)
2547         ws.peer.peerImpl = &ws
2548         if t.haveInfo() {
2549                 ws.onGotInfo(t.info)
2550         }
2551         t.webSeeds[url] = &ws.peer
2552 }
2553
2554 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2555         t.iterPeers(func(p1 *Peer) {
2556                 if p1 == p {
2557                         active = true
2558                 }
2559         })
2560         return
2561 }
2562
2563 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2564         index := t.pieceIndexOfRequestIndex(ri)
2565         return Request{
2566                 pp.Integer(index),
2567                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2568         }
2569 }
2570
2571 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2572         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2573 }
2574
2575 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2576         return RequestIndex(piece) * t.chunksPerRegularPiece()
2577 }
2578
2579 func (t *Torrent) updateComplete() {
2580         t.Complete.SetBool(t.haveAllPieces())
2581 }
2582
2583 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2584         p := t.requestingPeer(r)
2585         if p != nil {
2586                 p.cancel(r)
2587         }
2588         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2589         //delete(t.pendingRequests, r)
2590         if _, ok := t.requestState[r]; ok {
2591                 panic("expected request state to be gone")
2592         }
2593         return p
2594 }
2595
2596 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2597         return t.requestState[r].peer
2598 }
2599
2600 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2601         if t.connsWithAllPieces == nil {
2602                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2603         }
2604         t.connsWithAllPieces[p] = struct{}{}
2605 }
2606
2607 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2608         _, ok := t.connsWithAllPieces[p]
2609         delete(t.connsWithAllPieces, p)
2610         return ok
2611 }
2612
2613 func (t *Torrent) numActivePeers() int {
2614         return len(t.conns) + len(t.webSeeds)
2615 }
2616
2617 func (t *Torrent) hasStorageCap() bool {
2618         f := t.storage.Capacity
2619         if f == nil {
2620                 return false
2621         }
2622         _, ok := (*f)()
2623         return ok
2624 }
2625
2626 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2627         return pieceIndex(ri / t.chunksPerRegularPiece())
2628 }
2629
2630 func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
2631         reuseIter *typedRoaring.Iterator[RequestIndex],
2632         piece pieceIndex,
2633         f func(RequestIndex),
2634 ) {
2635         reuseIter.Initialize(&t.dirtyChunks)
2636         pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
2637         iterBitmapUnsetInRange(
2638                 reuseIter,
2639                 pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
2640                 f,
2641         )
2642 }
2643
2644 type requestState struct {
2645         peer *Peer
2646         when time.Time
2647 }
2648
2649 // Returns an error if a received chunk is out of bounds in someway.
2650 func (t *Torrent) checkValidReceiveChunk(r Request) error {
2651         if !t.haveInfo() {
2652                 return errors.New("torrent missing info")
2653         }
2654         if int(r.Index) >= t.numPieces() {
2655                 return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
2656         }
2657         pieceLength := t.pieceLength(pieceIndex(r.Index))
2658         if r.Begin >= pieceLength {
2659                 return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
2660         }
2661         // We could check chunk lengths here, but chunk request size is not changed often, and tricky
2662         // for peers to manipulate as they need to send potentially large buffers to begin with. There
2663         // should be considerable checks elsewhere for this case due to the network overhead. We should
2664         // catch most of the overflow manipulation stuff by checking index and begin above.
2665         return nil
2666 }