]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Remove redundant type conversions
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "net/netip"
12         "net/url"
13         "sort"
14         "strings"
15         "text/tabwriter"
16         "time"
17         "unsafe"
18
19         "github.com/RoaringBitmap/roaring"
20         "github.com/anacrolix/chansync"
21         "github.com/anacrolix/chansync/events"
22         "github.com/anacrolix/dht/v2"
23         . "github.com/anacrolix/generics"
24         "github.com/anacrolix/log"
25         "github.com/anacrolix/missinggo/perf"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/bitmap"
29         "github.com/anacrolix/missinggo/v2/pubsub"
30         "github.com/anacrolix/multiless"
31         "github.com/anacrolix/sync"
32         request_strategy "github.com/anacrolix/torrent/request-strategy"
33         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/pion/datachannel"
36
37         "github.com/anacrolix/torrent/bencode"
38         "github.com/anacrolix/torrent/common"
39         "github.com/anacrolix/torrent/metainfo"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         "github.com/anacrolix/torrent/segments"
42         "github.com/anacrolix/torrent/storage"
43         "github.com/anacrolix/torrent/tracker"
44         "github.com/anacrolix/torrent/webseed"
45         "github.com/anacrolix/torrent/webtorrent"
46 )
47
48 // Maintains state of torrent within a Client. Many methods should not be called before the info is
49 // available, see .Info and .GotInfo.
50 type Torrent struct {
51         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
52         // alignment. See #262.
53         stats  ConnStats
54         cl     *Client
55         logger log.Logger
56
57         networkingEnabled      chansync.Flag
58         dataDownloadDisallowed chansync.Flag
59         dataUploadDisallowed   bool
60         userOnWriteChunkErr    func(error)
61
62         closed   chansync.SetOnce
63         infoHash metainfo.Hash
64         pieces   []Piece
65         // Values are the piece indices that changed.
66         pieceStateChanges pubsub.PubSub[PieceStateChange]
67         // The size of chunks to request from peers over the wire. This is
68         // normally 16KiB by convention these days.
69         chunkSize pp.Integer
70         chunkPool sync.Pool
71         // Total length of the torrent in bytes. Stored because it's not O(1) to
72         // get this from the info dict.
73         length *int64
74
75         // The storage to open when the info dict becomes available.
76         storageOpener *storage.Client
77         // Storage for torrent data.
78         storage *storage.Torrent
79         // Read-locked for using storage, and write-locked for Closing.
80         storageLock sync.RWMutex
81
82         // TODO: Only announce stuff is used?
83         metainfo metainfo.MetaInfo
84
85         // The info dict. nil if we don't have it (yet).
86         info      *metainfo.Info
87         fileIndex segments.Index
88         files     *[]*File
89
90         webSeeds map[string]*Peer
91         // Active peer connections, running message stream loops. TODO: Make this
92         // open (not-closed) connections only.
93         conns               map[*PeerConn]struct{}
94         maxEstablishedConns int
95         // Set of addrs to which we're attempting to connect. Connections are
96         // half-open until all handshakes are completed.
97         halfOpen map[string]PeerInfo
98
99         // Reserve of peers to connect to. A peer can be both here and in the
100         // active connections if were told about the peer after connecting with
101         // them. That encourages us to reconnect to peers that are well known in
102         // the swarm.
103         peers prioritizedPeers
104         // Whether we want to know to know more peers.
105         wantPeersEvent missinggo.Event
106         // An announcer for each tracker URL.
107         trackerAnnouncers map[string]torrentTrackerAnnouncer
108         // How many times we've initiated a DHT announce. TODO: Move into stats.
109         numDHTAnnounces int
110
111         // Name used if the info name isn't available. Should be cleared when the
112         // Info does become available.
113         nameMu      sync.RWMutex
114         displayName string
115
116         // The bencoded bytes of the info dict. This is actively manipulated if
117         // the info bytes aren't initially available, and we try to fetch them
118         // from peers.
119         metadataBytes []byte
120         // Each element corresponds to the 16KiB metadata pieces. If true, we have
121         // received that piece.
122         metadataCompletedChunks []bool
123         metadataChanged         sync.Cond
124
125         // Closed when .Info is obtained.
126         gotMetainfoC chan struct{}
127
128         readers                map[*reader]struct{}
129         _readerNowPieces       bitmap.Bitmap
130         _readerReadaheadPieces bitmap.Bitmap
131
132         // A cache of pieces we need to get. Calculated from various piece and
133         // file priorities and completion states elsewhere.
134         _pendingPieces roaring.Bitmap
135         // A cache of completed piece indices.
136         _completedPieces roaring.Bitmap
137         // Pieces that need to be hashed.
138         piecesQueuedForHash       bitmap.Bitmap
139         activePieceHashes         int
140         initialPieceCheckDisabled bool
141
142         connsWithAllPieces map[*Peer]struct{}
143         // Count of each request across active connections.
144         pendingRequests map[RequestIndex]*Peer
145         lastRequested   map[RequestIndex]time.Time
146         // Chunks we've written to since the corresponding piece was last checked.
147         dirtyChunks typedRoaring.Bitmap[RequestIndex]
148
149         pex pexState
150
151         // Is On when all pieces are complete.
152         Complete chansync.Flag
153
154         // Torrent sources in use keyed by the source string.
155         activeSources sync.Map
156         sourcesLogger log.Logger
157
158         smartBanCache smartBanCache
159 }
160
161 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
162         // This could be done with roaring.BitSliceIndexing.
163         t.iterPeers(func(peer *Peer) {
164                 if _, ok := t.connsWithAllPieces[peer]; ok {
165                         return
166                 }
167                 if peer.peerHasPiece(i) {
168                         count++
169                 }
170         })
171         return
172 }
173
174 func (t *Torrent) decPieceAvailability(i pieceIndex) {
175         if !t.haveInfo() {
176                 return
177         }
178         p := t.piece(i)
179         if p.relativeAvailability <= 0 {
180                 panic(p.relativeAvailability)
181         }
182         p.relativeAvailability--
183         t.updatePieceRequestOrder(i)
184 }
185
186 func (t *Torrent) incPieceAvailability(i pieceIndex) {
187         // If we don't the info, this should be reconciled when we do.
188         if t.haveInfo() {
189                 p := t.piece(i)
190                 p.relativeAvailability++
191                 t.updatePieceRequestOrder(i)
192         }
193 }
194
195 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
196         return t._readerNowPieces
197 }
198
199 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
200         return t._readerReadaheadPieces
201 }
202
203 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
204         return !t.wantPieceIndex(i)
205 }
206
207 // Returns a channel that is closed when the Torrent is closed.
208 func (t *Torrent) Closed() events.Done {
209         return t.closed.Done()
210 }
211
212 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
213 // pending, and half-open peers.
214 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
215         // Add pending peers to the list
216         t.peers.Each(func(peer PeerInfo) {
217                 ks = append(ks, peer)
218         })
219
220         // Add half-open peers to the list
221         for _, peer := range t.halfOpen {
222                 ks = append(ks, peer)
223         }
224
225         // Add active peers to the list
226         for conn := range t.conns {
227                 ks = append(ks, PeerInfo{
228                         Id:     conn.PeerID,
229                         Addr:   conn.RemoteAddr,
230                         Source: conn.Discovery,
231                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
232                         // > But if we're not connected to them with an encrypted connection, I couldn't say
233                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
234                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
235                         // > connection. It's probably easiest to do the latter for now.
236                         // https://github.com/anacrolix/torrent/pull/188
237                         SupportsEncryption: conn.headerEncrypted,
238                 })
239         }
240
241         return
242 }
243
244 func (t *Torrent) setChunkSize(size pp.Integer) {
245         t.chunkSize = size
246         t.chunkPool = sync.Pool{
247                 New: func() interface{} {
248                         b := make([]byte, size)
249                         return &b
250                 },
251         }
252 }
253
254 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
255         return t._completedPieces.Contains(bitmap.BitIndex(piece))
256 }
257
258 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
259         if t.storage == nil {
260                 return storage.Completion{Complete: false, Ok: true}
261         }
262         return t.pieces[piece].Storage().Completion()
263 }
264
265 // There's a connection to that address already.
266 func (t *Torrent) addrActive(addr string) bool {
267         if _, ok := t.halfOpen[addr]; ok {
268                 return true
269         }
270         for c := range t.conns {
271                 ra := c.RemoteAddr
272                 if ra.String() == addr {
273                         return true
274                 }
275         }
276         return false
277 }
278
279 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
280         return t.appendConns(ret, func(conn *PeerConn) bool {
281                 return !conn.closed.IsSet()
282         })
283 }
284
285 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
286         for c := range t.conns {
287                 if f(c) {
288                         ret = append(ret, c)
289                 }
290         }
291         return ret
292 }
293
294 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
295         cl := t.cl
296         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
297         if t.closed.IsSet() {
298                 return false
299         }
300         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
301                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
302                         torrent.Add("peers not added because of bad addr", 1)
303                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
304                         return false
305                 }
306         }
307         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
308                 torrent.Add("peers replaced", 1)
309                 if !replaced.equal(p) {
310                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
311                         added = true
312                 }
313         } else {
314                 added = true
315         }
316         t.openNewConns()
317         for t.peers.Len() > cl.config.TorrentPeersHighWater {
318                 _, ok := t.peers.DeleteMin()
319                 if ok {
320                         torrent.Add("excess reserve peers discarded", 1)
321                 }
322         }
323         return
324 }
325
326 func (t *Torrent) invalidateMetadata() {
327         for i := 0; i < len(t.metadataCompletedChunks); i++ {
328                 t.metadataCompletedChunks[i] = false
329         }
330         t.nameMu.Lock()
331         t.gotMetainfoC = make(chan struct{})
332         t.info = nil
333         t.nameMu.Unlock()
334 }
335
336 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
337         if t.haveInfo() {
338                 return
339         }
340         if index >= len(t.metadataCompletedChunks) {
341                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
342                 return
343         }
344         copy(t.metadataBytes[(1<<14)*index:], data)
345         t.metadataCompletedChunks[index] = true
346 }
347
348 func (t *Torrent) metadataPieceCount() int {
349         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
350 }
351
352 func (t *Torrent) haveMetadataPiece(piece int) bool {
353         if t.haveInfo() {
354                 return (1<<14)*piece < len(t.metadataBytes)
355         } else {
356                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
357         }
358 }
359
360 func (t *Torrent) metadataSize() int {
361         return len(t.metadataBytes)
362 }
363
364 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
365         for i := 0; i < len(info.Pieces); i += sha1.Size {
366                 ret = append(ret, info.Pieces[i:i+sha1.Size])
367         }
368         return
369 }
370
371 func (t *Torrent) makePieces() {
372         hashes := infoPieceHashes(t.info)
373         t.pieces = make([]Piece, len(hashes))
374         for i, hash := range hashes {
375                 piece := &t.pieces[i]
376                 piece.t = t
377                 piece.index = pieceIndex(i)
378                 piece.noPendingWrites.L = &piece.pendingWritesMutex
379                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
380                 files := *t.files
381                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
382                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
383                 piece.files = files[beginFile:endFile]
384                 piece.undirtiedChunksIter = undirtiedChunksIter{
385                         TorrentDirtyChunks: &t.dirtyChunks,
386                         StartRequestIndex:  piece.requestIndexOffset(),
387                         EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
388                 }
389         }
390 }
391
392 // Returns the index of the first file containing the piece. files must be
393 // ordered by offset.
394 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
395         for i, f := range files {
396                 if f.offset+f.length > pieceOffset {
397                         return i
398                 }
399         }
400         return 0
401 }
402
403 // Returns the index after the last file containing the piece. files must be
404 // ordered by offset.
405 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
406         for i, f := range files {
407                 if f.offset+f.length >= pieceEndOffset {
408                         return i + 1
409                 }
410         }
411         return 0
412 }
413
414 func (t *Torrent) cacheLength() {
415         var l int64
416         for _, f := range t.info.UpvertedFiles() {
417                 l += f.Length
418         }
419         t.length = &l
420 }
421
422 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
423 // separately.
424 func (t *Torrent) setInfo(info *metainfo.Info) error {
425         if err := validateInfo(info); err != nil {
426                 return fmt.Errorf("bad info: %s", err)
427         }
428         if t.storageOpener != nil {
429                 var err error
430                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
431                 if err != nil {
432                         return fmt.Errorf("error opening torrent storage: %s", err)
433                 }
434         }
435         t.nameMu.Lock()
436         t.info = info
437         t.nameMu.Unlock()
438         t.updateComplete()
439         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
440         t.displayName = "" // Save a few bytes lol.
441         t.initFiles()
442         t.cacheLength()
443         t.makePieces()
444         return nil
445 }
446
447 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
448         return request_strategy.PieceRequestOrderKey{
449                 InfoHash: t.infoHash,
450                 Index:    i,
451         }
452 }
453
454 // This seems to be all the follow-up tasks after info is set, that can't fail.
455 func (t *Torrent) onSetInfo() {
456         t.initPieceRequestOrder()
457         for i := range t.pieces {
458                 p := &t.pieces[i]
459                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
460                 // being dropped.
461                 if p.relativeAvailability != 0 {
462                         panic(p.relativeAvailability)
463                 }
464                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
465                 t.addRequestOrderPiece(i)
466                 t.updatePieceCompletion(i)
467                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
468                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
469                         t.queuePieceCheck(i)
470                 }
471         }
472         t.cl.event.Broadcast()
473         close(t.gotMetainfoC)
474         t.updateWantPeersEvent()
475         t.pendingRequests = make(map[RequestIndex]*Peer)
476         t.lastRequested = make(map[RequestIndex]time.Time)
477         t.tryCreateMorePieceHashers()
478         t.iterPeers(func(p *Peer) {
479                 p.onGotInfo(t.info)
480                 p.updateRequests("onSetInfo")
481         })
482 }
483
484 // Called when metadata for a torrent becomes available.
485 func (t *Torrent) setInfoBytesLocked(b []byte) error {
486         if metainfo.HashBytes(b) != t.infoHash {
487                 return errors.New("info bytes have wrong hash")
488         }
489         var info metainfo.Info
490         if err := bencode.Unmarshal(b, &info); err != nil {
491                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
492         }
493         t.metadataBytes = b
494         t.metadataCompletedChunks = nil
495         if t.info != nil {
496                 return nil
497         }
498         if err := t.setInfo(&info); err != nil {
499                 return err
500         }
501         t.onSetInfo()
502         return nil
503 }
504
505 func (t *Torrent) haveAllMetadataPieces() bool {
506         if t.haveInfo() {
507                 return true
508         }
509         if t.metadataCompletedChunks == nil {
510                 return false
511         }
512         for _, have := range t.metadataCompletedChunks {
513                 if !have {
514                         return false
515                 }
516         }
517         return true
518 }
519
520 // TODO: Propagate errors to disconnect peer.
521 func (t *Torrent) setMetadataSize(size int) (err error) {
522         if t.haveInfo() {
523                 // We already know the correct metadata size.
524                 return
525         }
526         if uint32(size) > maxMetadataSize {
527                 return errors.New("bad size")
528         }
529         if len(t.metadataBytes) == size {
530                 return
531         }
532         t.metadataBytes = make([]byte, size)
533         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
534         t.metadataChanged.Broadcast()
535         for c := range t.conns {
536                 c.requestPendingMetadata()
537         }
538         return
539 }
540
541 // The current working name for the torrent. Either the name in the info dict,
542 // or a display name given such as by the dn value in a magnet link, or "".
543 func (t *Torrent) name() string {
544         t.nameMu.RLock()
545         defer t.nameMu.RUnlock()
546         if t.haveInfo() {
547                 return t.info.BestName()
548         }
549         if t.displayName != "" {
550                 return t.displayName
551         }
552         return "infohash:" + t.infoHash.HexString()
553 }
554
555 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
556         p := &t.pieces[index]
557         ret.Priority = t.piecePriority(index)
558         ret.Completion = p.completion()
559         ret.QueuedForHash = p.queuedForHash()
560         ret.Hashing = p.hashing
561         ret.Checking = ret.QueuedForHash || ret.Hashing
562         ret.Marking = p.marking
563         if !ret.Complete && t.piecePartiallyDownloaded(index) {
564                 ret.Partial = true
565         }
566         return
567 }
568
569 func (t *Torrent) metadataPieceSize(piece int) int {
570         return metadataPieceSize(len(t.metadataBytes), piece)
571 }
572
573 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
574         return pp.Message{
575                 Type:       pp.Extended,
576                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
577                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
578                         Piece:     piece,
579                         TotalSize: len(t.metadataBytes),
580                         Type:      msgType,
581                 }), data...),
582         }
583 }
584
585 type pieceAvailabilityRun struct {
586         Count        pieceIndex
587         Availability int
588 }
589
590 func (me pieceAvailabilityRun) String() string {
591         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
592 }
593
594 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
595         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
596                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
597         })
598         for i := range t.pieces {
599                 rle.Append(t.pieces[i].availability(), 1)
600         }
601         rle.Flush()
602         return
603 }
604
605 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
606         freqs = make([]int, t.numActivePeers()+1)
607         for i := range t.pieces {
608                 freqs[t.piece(i).availability()]++
609         }
610         return
611 }
612
613 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
614         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
615                 ret = append(ret, PieceStateRun{
616                         PieceState: el.(PieceState),
617                         Length:     int(count),
618                 })
619         })
620         for index := range t.pieces {
621                 rle.Append(t.pieceState(pieceIndex(index)), 1)
622         }
623         rle.Flush()
624         return
625 }
626
627 // Produces a small string representing a PieceStateRun.
628 func (psr PieceStateRun) String() (ret string) {
629         ret = fmt.Sprintf("%d", psr.Length)
630         ret += func() string {
631                 switch psr.Priority {
632                 case PiecePriorityNext:
633                         return "N"
634                 case PiecePriorityNormal:
635                         return "."
636                 case PiecePriorityReadahead:
637                         return "R"
638                 case PiecePriorityNow:
639                         return "!"
640                 case PiecePriorityHigh:
641                         return "H"
642                 default:
643                         return ""
644                 }
645         }()
646         if psr.Hashing {
647                 ret += "H"
648         }
649         if psr.QueuedForHash {
650                 ret += "Q"
651         }
652         if psr.Marking {
653                 ret += "M"
654         }
655         if psr.Partial {
656                 ret += "P"
657         }
658         if psr.Complete {
659                 ret += "C"
660         }
661         if !psr.Ok {
662                 ret += "?"
663         }
664         return
665 }
666
667 func (t *Torrent) writeStatus(w io.Writer) {
668         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
669         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
670         if !t.haveInfo() {
671                 fmt.Fprintf(w, "Metadata have: ")
672                 for _, h := range t.metadataCompletedChunks {
673                         fmt.Fprintf(w, "%c", func() rune {
674                                 if h {
675                                         return 'H'
676                                 } else {
677                                         return '.'
678                                 }
679                         }())
680                 }
681                 fmt.Fprintln(w)
682         }
683         fmt.Fprintf(w, "Piece length: %s\n",
684                 func() string {
685                         if t.haveInfo() {
686                                 return fmt.Sprintf("%v (%v chunks)",
687                                         t.usualPieceSize(),
688                                         float64(t.usualPieceSize())/float64(t.chunkSize))
689                         } else {
690                                 return "no info"
691                         }
692                 }(),
693         )
694         if t.info != nil {
695                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
696                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
697                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
698                 // availability frequencies instead.
699                 if false {
700                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
701                                 for _, run := range t.pieceAvailabilityRuns() {
702                                         ret = append(ret, run.String())
703                                 }
704                                 return
705                         }(), " "))
706                 }
707                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
708                         func() (ret []string) {
709                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
710                                         if freq == 0 {
711                                                 continue
712                                         }
713                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
714                                 }
715                                 return
716                         }(),
717                         ", "))
718         }
719         fmt.Fprintf(w, "Reader Pieces:")
720         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
721                 fmt.Fprintf(w, " %d:%d", begin, end)
722                 return true
723         })
724         fmt.Fprintln(w)
725
726         fmt.Fprintf(w, "Enabled trackers:\n")
727         func() {
728                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
729                 fmt.Fprintf(tw, "    URL\tExtra\n")
730                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
731                         lu := l.URL()
732                         ru := r.URL()
733                         var luns, runs url.URL = *lu, *ru
734                         luns.Scheme = ""
735                         runs.Scheme = ""
736                         var ml missinggo.MultiLess
737                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
738                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
739                         return ml.Less()
740                 }).([]torrentTrackerAnnouncer) {
741                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
742                 }
743                 tw.Flush()
744         }()
745
746         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
747
748         spew.NewDefaultConfig()
749         spew.Fdump(w, t.statsLocked())
750
751         peers := t.peersAsSlice()
752         sort.Slice(peers, func(_i, _j int) bool {
753                 i := peers[_i]
754                 j := peers[_j]
755                 if less, ok := multiless.New().EagerSameLess(
756                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
757                 ).LessOk(); ok {
758                         return less
759                 }
760                 return worseConn(i, j)
761         })
762         for i, c := range peers {
763                 fmt.Fprintf(w, "%2d. ", i+1)
764                 c.writeStatus(w, t)
765         }
766 }
767
768 func (t *Torrent) haveInfo() bool {
769         return t.info != nil
770 }
771
772 // Returns a run-time generated MetaInfo that includes the info bytes and
773 // announce-list as currently known to the client.
774 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
775         return metainfo.MetaInfo{
776                 CreationDate: time.Now().Unix(),
777                 Comment:      "dynamic metainfo from client",
778                 CreatedBy:    "go.torrent",
779                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
780                 InfoBytes: func() []byte {
781                         if t.haveInfo() {
782                                 return t.metadataBytes
783                         } else {
784                                 return nil
785                         }
786                 }(),
787                 UrlList: func() []string {
788                         ret := make([]string, 0, len(t.webSeeds))
789                         for url := range t.webSeeds {
790                                 ret = append(ret, url)
791                         }
792                         return ret
793                 }(),
794         }
795 }
796
797 // Get bytes left
798 func (t *Torrent) BytesMissing() (n int64) {
799         t.cl.rLock()
800         n = t.bytesMissingLocked()
801         t.cl.rUnlock()
802         return
803 }
804
805 func (t *Torrent) bytesMissingLocked() int64 {
806         return t.bytesLeft()
807 }
808
809 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
810         roaring.Flip(b, 0, end).Iterate(cb)
811 }
812
813 func (t *Torrent) bytesLeft() (left int64) {
814         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
815                 p := t.piece(pieceIndex(x))
816                 left += int64(p.length() - p.numDirtyBytes())
817                 return true
818         })
819         return
820 }
821
822 // Bytes left to give in tracker announces.
823 func (t *Torrent) bytesLeftAnnounce() int64 {
824         if t.haveInfo() {
825                 return t.bytesLeft()
826         } else {
827                 return -1
828         }
829 }
830
831 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
832         if t.pieceComplete(piece) {
833                 return false
834         }
835         if t.pieceAllDirty(piece) {
836                 return false
837         }
838         return t.pieces[piece].hasDirtyChunks()
839 }
840
841 func (t *Torrent) usualPieceSize() int {
842         return int(t.info.PieceLength)
843 }
844
845 func (t *Torrent) numPieces() pieceIndex {
846         return pieceIndex(t.info.NumPieces())
847 }
848
849 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
850         return pieceIndex(t._completedPieces.GetCardinality())
851 }
852
853 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
854         if !t.closed.Set() {
855                 err = errors.New("already closed")
856                 return
857         }
858         if t.storage != nil {
859                 wg.Add(1)
860                 go func() {
861                         defer wg.Done()
862                         t.storageLock.Lock()
863                         defer t.storageLock.Unlock()
864                         if f := t.storage.Close; f != nil {
865                                 err1 := f()
866                                 if err1 != nil {
867                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
868                                 }
869                         }
870                 }()
871         }
872         t.iterPeers(func(p *Peer) {
873                 p.close()
874         })
875         if t.storage != nil {
876                 t.deletePieceRequestOrder()
877         }
878         for i := range t.pieces {
879                 p := t.piece(i)
880                 if p.relativeAvailability != 0 {
881                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
882                 }
883         }
884         t.pex.Reset()
885         t.cl.event.Broadcast()
886         t.pieceStateChanges.Close()
887         t.updateWantPeersEvent()
888         return
889 }
890
891 func (t *Torrent) requestOffset(r Request) int64 {
892         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
893 }
894
895 // Return the request that would include the given offset into the torrent data. Returns !ok if
896 // there is no such request.
897 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
898         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
899 }
900
901 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
902         defer perf.ScopeTimerErr(&err)()
903         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
904         if err == nil && n != len(data) {
905                 err = io.ErrShortWrite
906         }
907         return err
908 }
909
910 func (t *Torrent) bitfield() (bf []bool) {
911         bf = make([]bool, t.numPieces())
912         t._completedPieces.Iterate(func(piece uint32) (again bool) {
913                 bf[piece] = true
914                 return true
915         })
916         return
917 }
918
919 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
920         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
921 }
922
923 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
924         return chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
925 }
926
927 func (t *Torrent) numRequests() RequestIndex {
928         if t.numPieces() == 0 {
929                 return 0
930         }
931         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
932 }
933
934 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
935         t.dirtyChunks.RemoveRange(
936                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
937                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
938 }
939
940 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
941         if t.info.PieceLength == 0 {
942                 // There will be no variance amongst pieces. Only pain.
943                 return 0
944         }
945         if piece == t.numPieces()-1 {
946                 ret := pp.Integer(*t.length % t.info.PieceLength)
947                 if ret != 0 {
948                         return ret
949                 }
950         }
951         return pp.Integer(t.info.PieceLength)
952 }
953
954 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
955         return &blockCheckingWriter{
956                 cache:        &t.smartBanCache,
957                 requestIndex: t.pieceRequestIndexOffset(piece),
958                 chunkSize:    t.chunkSize.Int(),
959         }
960 }
961
962 func (t *Torrent) hashPiece(piece pieceIndex) (
963         ret metainfo.Hash,
964         // These are peers that sent us blocks that differ from what we hash here.
965         differingPeers map[bannableAddr]struct{},
966         err error,
967 ) {
968         p := t.piece(piece)
969         p.waitNoPendingWrites()
970         storagePiece := t.pieces[piece].Storage()
971
972         // Does the backend want to do its own hashing?
973         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
974                 var sum metainfo.Hash
975                 // log.Printf("A piece decided to self-hash: %d", piece)
976                 sum, err = i.SelfHash()
977                 missinggo.CopyExact(&ret, sum)
978                 return
979         }
980
981         hash := pieceHash.New()
982         const logPieceContents = false
983         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
984         writers := []io.Writer{hash, smartBanWriter}
985         var examineBuf bytes.Buffer
986         if logPieceContents {
987                 writers = append(writers, &examineBuf)
988         }
989         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
990         if logPieceContents {
991                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
992         }
993         smartBanWriter.Flush()
994         differingPeers = smartBanWriter.badPeers
995         missinggo.CopyExact(&ret, hash.Sum(nil))
996         return
997 }
998
999 func (t *Torrent) haveAnyPieces() bool {
1000         return !t._completedPieces.IsEmpty()
1001 }
1002
1003 func (t *Torrent) haveAllPieces() bool {
1004         if !t.haveInfo() {
1005                 return false
1006         }
1007         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1008 }
1009
1010 func (t *Torrent) havePiece(index pieceIndex) bool {
1011         return t.haveInfo() && t.pieceComplete(index)
1012 }
1013
1014 func (t *Torrent) maybeDropMutuallyCompletePeer(
1015         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1016         // okay?
1017         p *Peer,
1018 ) {
1019         if !t.cl.config.DropMutuallyCompletePeers {
1020                 return
1021         }
1022         if !t.haveAllPieces() {
1023                 return
1024         }
1025         if all, known := p.peerHasAllPieces(); !(known && all) {
1026                 return
1027         }
1028         if p.useful() {
1029                 return
1030         }
1031         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
1032         p.drop()
1033 }
1034
1035 func (t *Torrent) haveChunk(r Request) (ret bool) {
1036         // defer func() {
1037         //      log.Println("have chunk", r, ret)
1038         // }()
1039         if !t.haveInfo() {
1040                 return false
1041         }
1042         if t.pieceComplete(pieceIndex(r.Index)) {
1043                 return true
1044         }
1045         p := &t.pieces[r.Index]
1046         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1047 }
1048
1049 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1050         return chunkIndexType(cs.Begin / chunkSize)
1051 }
1052
1053 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1054         return t._pendingPieces.Contains(uint32(index))
1055 }
1056
1057 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1058 // conns (which is a map).
1059 var peerConnSlices sync.Pool
1060
1061 func getPeerConnSlice(cap int) []*PeerConn {
1062         getInterface := peerConnSlices.Get()
1063         if getInterface == nil {
1064                 return make([]*PeerConn, 0, cap)
1065         } else {
1066                 return getInterface.([]*PeerConn)[:0]
1067         }
1068 }
1069
1070 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1071 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1072 // established connections for more than a minute. This is O(n log n). If there was a way to not
1073 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1074 func (t *Torrent) worstBadConn() (ret *PeerConn) {
1075         wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
1076         defer peerConnSlices.Put(wcs.conns)
1077         wcs.initKeys()
1078         heap.Init(&wcs)
1079         for wcs.Len() != 0 {
1080                 c := heap.Pop(&wcs).(*PeerConn)
1081                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1082                         return c
1083                 }
1084                 // If the connection is in the worst half of the established
1085                 // connection quota and is older than a minute.
1086                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1087                         // Give connections 1 minute to prove themselves.
1088                         if time.Since(c.completedHandshake) > time.Minute {
1089                                 return c
1090                         }
1091                 }
1092         }
1093         return nil
1094 }
1095
1096 type PieceStateChange struct {
1097         Index int
1098         PieceState
1099 }
1100
1101 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1102         t.cl._mu.Defer(func() {
1103                 cur := t.pieceState(piece)
1104                 p := &t.pieces[piece]
1105                 if cur != p.publicPieceState {
1106                         p.publicPieceState = cur
1107                         t.pieceStateChanges.Publish(PieceStateChange{
1108                                 int(piece),
1109                                 cur,
1110                         })
1111                 }
1112         })
1113 }
1114
1115 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1116         if t.pieceComplete(piece) {
1117                 return 0
1118         }
1119         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1120 }
1121
1122 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1123         return t.pieces[piece].allChunksDirty()
1124 }
1125
1126 func (t *Torrent) readersChanged() {
1127         t.updateReaderPieces()
1128         t.updateAllPiecePriorities("Torrent.readersChanged")
1129 }
1130
1131 func (t *Torrent) updateReaderPieces() {
1132         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1133 }
1134
1135 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1136         if from == to {
1137                 return
1138         }
1139         t.updateReaderPieces()
1140         // Order the ranges, high and low.
1141         l, h := from, to
1142         if l.begin > h.begin {
1143                 l, h = h, l
1144         }
1145         if l.end < h.begin {
1146                 // Two distinct ranges.
1147                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1148                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1149         } else {
1150                 // Ranges overlap.
1151                 end := l.end
1152                 if h.end > end {
1153                         end = h.end
1154                 }
1155                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1156         }
1157 }
1158
1159 func (t *Torrent) maybeNewConns() {
1160         // Tickle the accept routine.
1161         t.cl.event.Broadcast()
1162         t.openNewConns()
1163 }
1164
1165 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1166         if t._pendingPieces.Contains(uint32(piece)) {
1167                 t.iterPeers(func(c *Peer) {
1168                         // if c.requestState.Interested {
1169                         //      return
1170                         // }
1171                         if !c.isLowOnRequests() {
1172                                 return
1173                         }
1174                         if !c.peerHasPiece(piece) {
1175                                 return
1176                         }
1177                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1178                                 return
1179                         }
1180                         c.updateRequests(reason)
1181                 })
1182         }
1183         t.maybeNewConns()
1184         t.publishPieceChange(piece)
1185 }
1186
1187 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1188         if !t.closed.IsSet() {
1189                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1190                 // request order.
1191                 t.updatePieceRequestOrder(piece)
1192         }
1193         p := &t.pieces[piece]
1194         newPrio := p.uncachedPriority()
1195         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1196         if newPrio == PiecePriorityNone {
1197                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1198                         return
1199                 }
1200         } else {
1201                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1202                         return
1203                 }
1204         }
1205         t.piecePriorityChanged(piece, reason)
1206 }
1207
1208 func (t *Torrent) updateAllPiecePriorities(reason string) {
1209         t.updatePiecePriorities(0, t.numPieces(), reason)
1210 }
1211
1212 // Update all piece priorities in one hit. This function should have the same
1213 // output as updatePiecePriority, but across all pieces.
1214 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1215         for i := begin; i < end; i++ {
1216                 t.updatePiecePriority(i, reason)
1217         }
1218 }
1219
1220 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1221 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1222         if off >= *t.length {
1223                 return
1224         }
1225         if off < 0 {
1226                 size += off
1227                 off = 0
1228         }
1229         if size <= 0 {
1230                 return
1231         }
1232         begin = pieceIndex(off / t.info.PieceLength)
1233         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1234         if end > pieceIndex(t.info.NumPieces()) {
1235                 end = pieceIndex(t.info.NumPieces())
1236         }
1237         return
1238 }
1239
1240 // Returns true if all iterations complete without breaking. Returns the read regions for all
1241 // readers. The reader regions should not be merged as some callers depend on this method to
1242 // enumerate readers.
1243 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1244         for r := range t.readers {
1245                 p := r.pieces
1246                 if p.begin >= p.end {
1247                         continue
1248                 }
1249                 if !f(p.begin, p.end) {
1250                         return false
1251                 }
1252         }
1253         return true
1254 }
1255
1256 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1257         return t.piece(piece).uncachedPriority()
1258 }
1259
1260 func (t *Torrent) pendRequest(req RequestIndex) {
1261         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1262 }
1263
1264 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1265         t.cl.event.Broadcast()
1266         if t.pieceComplete(piece) {
1267                 t.onPieceCompleted(piece)
1268         } else {
1269                 t.onIncompletePiece(piece)
1270         }
1271         t.updatePiecePriority(piece, reason)
1272 }
1273
1274 func (t *Torrent) numReceivedConns() (ret int) {
1275         for c := range t.conns {
1276                 if c.Discovery == PeerSourceIncoming {
1277                         ret++
1278                 }
1279         }
1280         return
1281 }
1282
1283 func (t *Torrent) maxHalfOpen() int {
1284         // Note that if we somehow exceed the maximum established conns, we want
1285         // the negative value to have an effect.
1286         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1287         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1288         // We want to allow some experimentation with new peers, and to try to
1289         // upset an oversupply of received connections.
1290         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1291 }
1292
1293 func (t *Torrent) openNewConns() (initiated int) {
1294         defer t.updateWantPeersEvent()
1295         for t.peers.Len() != 0 {
1296                 if !t.wantConns() {
1297                         return
1298                 }
1299                 if len(t.halfOpen) >= t.maxHalfOpen() {
1300                         return
1301                 }
1302                 if len(t.cl.dialers) == 0 {
1303                         return
1304                 }
1305                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1306                         return
1307                 }
1308                 p := t.peers.PopMax()
1309                 t.initiateConn(p)
1310                 initiated++
1311         }
1312         return
1313 }
1314
1315 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1316         p := t.piece(piece)
1317         uncached := t.pieceCompleteUncached(piece)
1318         cached := p.completion()
1319         changed := cached != uncached
1320         complete := uncached.Complete
1321         p.storageCompletionOk = uncached.Ok
1322         x := uint32(piece)
1323         if complete {
1324                 t._completedPieces.Add(x)
1325                 t.openNewConns()
1326         } else {
1327                 t._completedPieces.Remove(x)
1328         }
1329         p.t.updatePieceRequestOrder(piece)
1330         t.updateComplete()
1331         if complete && len(p.dirtiers) != 0 {
1332                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1333         }
1334         if changed {
1335                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
1336                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1337         }
1338         return changed
1339 }
1340
1341 // Non-blocking read. Client lock is not required.
1342 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1343         for len(b) != 0 {
1344                 p := &t.pieces[off/t.info.PieceLength]
1345                 p.waitNoPendingWrites()
1346                 var n1 int
1347                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1348                 if n1 == 0 {
1349                         break
1350                 }
1351                 off += int64(n1)
1352                 n += n1
1353                 b = b[n1:]
1354         }
1355         return
1356 }
1357
1358 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1359 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1360 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1361 func (t *Torrent) maybeCompleteMetadata() error {
1362         if t.haveInfo() {
1363                 // Nothing to do.
1364                 return nil
1365         }
1366         if !t.haveAllMetadataPieces() {
1367                 // Don't have enough metadata pieces.
1368                 return nil
1369         }
1370         err := t.setInfoBytesLocked(t.metadataBytes)
1371         if err != nil {
1372                 t.invalidateMetadata()
1373                 return fmt.Errorf("error setting info bytes: %s", err)
1374         }
1375         if t.cl.config.Debug {
1376                 t.logger.Printf("%s: got metadata from peers", t)
1377         }
1378         return nil
1379 }
1380
1381 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1382         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1383                 if end > begin {
1384                         now.Add(bitmap.BitIndex(begin))
1385                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1386                 }
1387                 return true
1388         })
1389         return
1390 }
1391
1392 func (t *Torrent) needData() bool {
1393         if t.closed.IsSet() {
1394                 return false
1395         }
1396         if !t.haveInfo() {
1397                 return true
1398         }
1399         return !t._pendingPieces.IsEmpty()
1400 }
1401
1402 func appendMissingStrings(old, new []string) (ret []string) {
1403         ret = old
1404 new:
1405         for _, n := range new {
1406                 for _, o := range old {
1407                         if o == n {
1408                                 continue new
1409                         }
1410                 }
1411                 ret = append(ret, n)
1412         }
1413         return
1414 }
1415
1416 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1417         ret = existing
1418         for minNumTiers > len(ret) {
1419                 ret = append(ret, nil)
1420         }
1421         return
1422 }
1423
1424 func (t *Torrent) addTrackers(announceList [][]string) {
1425         fullAnnounceList := &t.metainfo.AnnounceList
1426         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1427         for tierIndex, trackerURLs := range announceList {
1428                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1429         }
1430         t.startMissingTrackerScrapers()
1431         t.updateWantPeersEvent()
1432 }
1433
1434 // Don't call this before the info is available.
1435 func (t *Torrent) bytesCompleted() int64 {
1436         if !t.haveInfo() {
1437                 return 0
1438         }
1439         return *t.length - t.bytesLeft()
1440 }
1441
1442 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1443         t.cl.lock()
1444         defer t.cl.unlock()
1445         return t.setInfoBytesLocked(b)
1446 }
1447
1448 // Returns true if connection is removed from torrent.Conns.
1449 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1450         if !c.closed.IsSet() {
1451                 panic("connection is not closed")
1452                 // There are behaviours prevented by the closed state that will fail
1453                 // if the connection has been deleted.
1454         }
1455         _, ret = t.conns[c]
1456         delete(t.conns, c)
1457         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1458         // the drop event against the PexConnState instead.
1459         if ret {
1460                 if !t.cl.config.DisablePEX {
1461                         t.pex.Drop(c)
1462                 }
1463         }
1464         torrent.Add("deleted connections", 1)
1465         c.deleteAllRequests("Torrent.deletePeerConn")
1466         t.assertPendingRequests()
1467         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1468                 panic(t.connsWithAllPieces)
1469         }
1470         return
1471 }
1472
1473 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1474         if t.deleteConnWithAllPieces(p) {
1475                 return
1476         }
1477         if !t.haveInfo() {
1478                 return
1479         }
1480         p.peerPieces().Iterate(func(i uint32) bool {
1481                 p.t.decPieceAvailability(pieceIndex(i))
1482                 return true
1483         })
1484 }
1485
1486 func (t *Torrent) assertPendingRequests() {
1487         if !check {
1488                 return
1489         }
1490         // var actual pendingRequests
1491         // if t.haveInfo() {
1492         //      actual.m = make([]int, t.numRequests())
1493         // }
1494         // t.iterPeers(func(p *Peer) {
1495         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1496         //              actual.Inc(x)
1497         //              return true
1498         //      })
1499         // })
1500         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1501         // if diff != "" {
1502         //      panic(diff)
1503         // }
1504 }
1505
1506 func (t *Torrent) dropConnection(c *PeerConn) {
1507         t.cl.event.Broadcast()
1508         c.close()
1509         if t.deletePeerConn(c) {
1510                 t.openNewConns()
1511         }
1512 }
1513
1514 // Peers as in contact information for dialing out.
1515 func (t *Torrent) wantPeers() bool {
1516         if t.closed.IsSet() {
1517                 return false
1518         }
1519         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1520                 return false
1521         }
1522         return t.wantConns()
1523 }
1524
1525 func (t *Torrent) updateWantPeersEvent() {
1526         if t.wantPeers() {
1527                 t.wantPeersEvent.Set()
1528         } else {
1529                 t.wantPeersEvent.Clear()
1530         }
1531 }
1532
1533 // Returns whether the client should make effort to seed the torrent.
1534 func (t *Torrent) seeding() bool {
1535         cl := t.cl
1536         if t.closed.IsSet() {
1537                 return false
1538         }
1539         if t.dataUploadDisallowed {
1540                 return false
1541         }
1542         if cl.config.NoUpload {
1543                 return false
1544         }
1545         if !cl.config.Seed {
1546                 return false
1547         }
1548         if cl.config.DisableAggressiveUpload && t.needData() {
1549                 return false
1550         }
1551         return true
1552 }
1553
1554 func (t *Torrent) onWebRtcConn(
1555         c datachannel.ReadWriteCloser,
1556         dcc webtorrent.DataChannelContext,
1557 ) {
1558         defer c.Close()
1559         netConn := webrtcNetConn{
1560                 ReadWriteCloser:    c,
1561                 DataChannelContext: dcc,
1562         }
1563         peerRemoteAddr := netConn.RemoteAddr()
1564         if t.cl.badPeerAddr(peerRemoteAddr) {
1565                 return
1566         }
1567         pc, err := t.cl.initiateProtocolHandshakes(
1568                 context.Background(),
1569                 netConn,
1570                 t,
1571                 dcc.LocalOffered,
1572                 false,
1573                 netConn.RemoteAddr(),
1574                 webrtcNetwork,
1575                 fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1576         )
1577         if err != nil {
1578                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1579                 return
1580         }
1581         if dcc.LocalOffered {
1582                 pc.Discovery = PeerSourceTracker
1583         } else {
1584                 pc.Discovery = PeerSourceIncoming
1585         }
1586         pc.conn.SetWriteDeadline(time.Time{})
1587         t.cl.lock()
1588         defer t.cl.unlock()
1589         err = t.cl.runHandshookConn(pc, t)
1590         if err != nil {
1591                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1592         }
1593 }
1594
1595 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1596         err := t.cl.runHandshookConn(pc, t)
1597         if err != nil || logAll {
1598                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1599         }
1600 }
1601
1602 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1603         t.logRunHandshookConn(pc, false, log.Debug)
1604 }
1605
1606 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1607         wtc, release := t.cl.websocketTrackers.Get(u.String())
1608         go func() {
1609                 <-t.closed.Done()
1610                 release()
1611         }()
1612         wst := websocketTrackerStatus{u, wtc}
1613         go func() {
1614                 err := wtc.Announce(tracker.Started, t.infoHash)
1615                 if err != nil {
1616                         t.logger.WithDefaultLevel(log.Warning).Printf(
1617                                 "error in initial announce to %q: %v",
1618                                 u.String(), err,
1619                         )
1620                 }
1621         }()
1622         return wst
1623 }
1624
1625 func (t *Torrent) startScrapingTracker(_url string) {
1626         if _url == "" {
1627                 return
1628         }
1629         u, err := url.Parse(_url)
1630         if err != nil {
1631                 // URLs with a leading '*' appear to be a uTorrent convention to
1632                 // disable trackers.
1633                 if _url[0] != '*' {
1634                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1635                 }
1636                 return
1637         }
1638         if u.Scheme == "udp" {
1639                 u.Scheme = "udp4"
1640                 t.startScrapingTracker(u.String())
1641                 u.Scheme = "udp6"
1642                 t.startScrapingTracker(u.String())
1643                 return
1644         }
1645         if _, ok := t.trackerAnnouncers[_url]; ok {
1646                 return
1647         }
1648         sl := func() torrentTrackerAnnouncer {
1649                 switch u.Scheme {
1650                 case "ws", "wss":
1651                         if t.cl.config.DisableWebtorrent {
1652                                 return nil
1653                         }
1654                         return t.startWebsocketAnnouncer(*u)
1655                 case "udp4":
1656                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1657                                 return nil
1658                         }
1659                 case "udp6":
1660                         if t.cl.config.DisableIPv6 {
1661                                 return nil
1662                         }
1663                 }
1664                 newAnnouncer := &trackerScraper{
1665                         u:               *u,
1666                         t:               t,
1667                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1668                 }
1669                 go newAnnouncer.Run()
1670                 return newAnnouncer
1671         }()
1672         if sl == nil {
1673                 return
1674         }
1675         if t.trackerAnnouncers == nil {
1676                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1677         }
1678         t.trackerAnnouncers[_url] = sl
1679 }
1680
1681 // Adds and starts tracker scrapers for tracker URLs that aren't already
1682 // running.
1683 func (t *Torrent) startMissingTrackerScrapers() {
1684         if t.cl.config.DisableTrackers {
1685                 return
1686         }
1687         t.startScrapingTracker(t.metainfo.Announce)
1688         for _, tier := range t.metainfo.AnnounceList {
1689                 for _, url := range tier {
1690                         t.startScrapingTracker(url)
1691                 }
1692         }
1693 }
1694
1695 // Returns an AnnounceRequest with fields filled out to defaults and current
1696 // values.
1697 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1698         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1699         // dependent on the network in use.
1700         return tracker.AnnounceRequest{
1701                 Event: event,
1702                 NumWant: func() int32 {
1703                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1704                                 return -1
1705                         } else {
1706                                 return 0
1707                         }
1708                 }(),
1709                 Port:     uint16(t.cl.incomingPeerPort()),
1710                 PeerId:   t.cl.peerID,
1711                 InfoHash: t.infoHash,
1712                 Key:      t.cl.announceKey(),
1713
1714                 // The following are vaguely described in BEP 3.
1715
1716                 Left:     t.bytesLeftAnnounce(),
1717                 Uploaded: t.stats.BytesWrittenData.Int64(),
1718                 // There's no mention of wasted or unwanted download in the BEP.
1719                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1720         }
1721 }
1722
1723 // Adds peers revealed in an announce until the announce ends, or we have
1724 // enough peers.
1725 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1726         cl := t.cl
1727         for v := range pvs {
1728                 cl.lock()
1729                 added := 0
1730                 for _, cp := range v.Peers {
1731                         if cp.Port == 0 {
1732                                 // Can't do anything with this.
1733                                 continue
1734                         }
1735                         if t.addPeer(PeerInfo{
1736                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1737                                 Source: PeerSourceDhtGetPeers,
1738                         }) {
1739                                 added++
1740                         }
1741                 }
1742                 cl.unlock()
1743                 // if added != 0 {
1744                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1745                 // }
1746         }
1747 }
1748
1749 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1750 // announce ends. stop will force the announce to end.
1751 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1752         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1753         if err != nil {
1754                 return
1755         }
1756         _done := make(chan struct{})
1757         done = _done
1758         stop = ps.Close
1759         go func() {
1760                 t.consumeDhtAnnouncePeers(ps.Peers())
1761                 close(_done)
1762         }()
1763         return
1764 }
1765
1766 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1767         _, stop, err := t.AnnounceToDht(s)
1768         if err != nil {
1769                 return err
1770         }
1771         select {
1772         case <-t.closed.Done():
1773         case <-time.After(5 * time.Minute):
1774         }
1775         stop()
1776         return nil
1777 }
1778
1779 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1780         cl := t.cl
1781         cl.lock()
1782         defer cl.unlock()
1783         for {
1784                 for {
1785                         if t.closed.IsSet() {
1786                                 return
1787                         }
1788                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1789                         // TODO: We can include the announce_peer step depending on whether we can receive
1790                         // inbound connections. We should probably only announce once every 15 mins too.
1791                         if !t.wantConns() {
1792                                 goto wait
1793                         }
1794                         // TODO: Determine if there's a listener on the port we're announcing.
1795                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1796                                 goto wait
1797                         }
1798                         break
1799                 wait:
1800                         cl.event.Wait()
1801                 }
1802                 func() {
1803                         t.numDHTAnnounces++
1804                         cl.unlock()
1805                         defer cl.lock()
1806                         err := t.timeboxedAnnounceToDht(s)
1807                         if err != nil {
1808                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1809                         }
1810                 }()
1811         }
1812 }
1813
1814 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1815         for _, p := range peers {
1816                 if t.addPeer(p) {
1817                         added++
1818                 }
1819         }
1820         return
1821 }
1822
1823 // The returned TorrentStats may require alignment in memory. See
1824 // https://github.com/anacrolix/torrent/issues/383.
1825 func (t *Torrent) Stats() TorrentStats {
1826         t.cl.rLock()
1827         defer t.cl.rUnlock()
1828         return t.statsLocked()
1829 }
1830
1831 func (t *Torrent) statsLocked() (ret TorrentStats) {
1832         ret.ActivePeers = len(t.conns)
1833         ret.HalfOpenPeers = len(t.halfOpen)
1834         ret.PendingPeers = t.peers.Len()
1835         ret.TotalPeers = t.numTotalPeers()
1836         ret.ConnectedSeeders = 0
1837         for c := range t.conns {
1838                 if all, ok := c.peerHasAllPieces(); all && ok {
1839                         ret.ConnectedSeeders++
1840                 }
1841         }
1842         ret.ConnStats = t.stats.Copy()
1843         ret.PiecesComplete = t.numPiecesCompleted()
1844         return
1845 }
1846
1847 // The total number of peers in the torrent.
1848 func (t *Torrent) numTotalPeers() int {
1849         peers := make(map[string]struct{})
1850         for conn := range t.conns {
1851                 ra := conn.conn.RemoteAddr()
1852                 if ra == nil {
1853                         // It's been closed and doesn't support RemoteAddr.
1854                         continue
1855                 }
1856                 peers[ra.String()] = struct{}{}
1857         }
1858         for addr := range t.halfOpen {
1859                 peers[addr] = struct{}{}
1860         }
1861         t.peers.Each(func(peer PeerInfo) {
1862                 peers[peer.Addr.String()] = struct{}{}
1863         })
1864         return len(peers)
1865 }
1866
1867 // Reconcile bytes transferred before connection was associated with a
1868 // torrent.
1869 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1870         if c._stats != (ConnStats{
1871                 // Handshakes should only increment these fields:
1872                 BytesWritten: c._stats.BytesWritten,
1873                 BytesRead:    c._stats.BytesRead,
1874         }) {
1875                 panic("bad stats")
1876         }
1877         c.postHandshakeStats(func(cs *ConnStats) {
1878                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1879                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1880         })
1881         c.reconciledHandshakeStats = true
1882 }
1883
1884 // Returns true if the connection is added.
1885 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1886         defer func() {
1887                 if err == nil {
1888                         torrent.Add("added connections", 1)
1889                 }
1890         }()
1891         if t.closed.IsSet() {
1892                 return errors.New("torrent closed")
1893         }
1894         for c0 := range t.conns {
1895                 if c.PeerID != c0.PeerID {
1896                         continue
1897                 }
1898                 if !t.cl.config.DropDuplicatePeerIds {
1899                         continue
1900                 }
1901                 if c.hasPreferredNetworkOver(c0) {
1902                         c0.close()
1903                         t.deletePeerConn(c0)
1904                 } else {
1905                         return errors.New("existing connection preferred")
1906                 }
1907         }
1908         if len(t.conns) >= t.maxEstablishedConns {
1909                 c := t.worstBadConn()
1910                 if c == nil {
1911                         return errors.New("don't want conns")
1912                 }
1913                 c.close()
1914                 t.deletePeerConn(c)
1915         }
1916         if len(t.conns) >= t.maxEstablishedConns {
1917                 panic(len(t.conns))
1918         }
1919         t.conns[c] = struct{}{}
1920         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1921                 t.pex.Add(c) // as no further extended handshake expected
1922         }
1923         return nil
1924 }
1925
1926 func (t *Torrent) wantConns() bool {
1927         if !t.networkingEnabled.Bool() {
1928                 return false
1929         }
1930         if t.closed.IsSet() {
1931                 return false
1932         }
1933         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
1934                 return false
1935         }
1936         return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
1937 }
1938
1939 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1940         t.cl.lock()
1941         defer t.cl.unlock()
1942         oldMax = t.maxEstablishedConns
1943         t.maxEstablishedConns = max
1944         wcs := worseConnSlice{
1945                 conns: t.appendConns(nil, func(*PeerConn) bool {
1946                         return true
1947                 }),
1948         }
1949         wcs.initKeys()
1950         heap.Init(&wcs)
1951         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1952                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
1953         }
1954         t.openNewConns()
1955         return oldMax
1956 }
1957
1958 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1959         t.logger.LazyLog(log.Debug, func() log.Msg {
1960                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
1961         })
1962         p := t.piece(piece)
1963         p.numVerifies++
1964         t.cl.event.Broadcast()
1965         if t.closed.IsSet() {
1966                 return
1967         }
1968
1969         // Don't score the first time a piece is hashed, it could be an initial check.
1970         if p.storageCompletionOk {
1971                 if passed {
1972                         pieceHashedCorrect.Add(1)
1973                 } else {
1974                         log.Fmsg(
1975                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1976                         ).AddValues(t, p).LogLevel(
1977
1978                                 log.Debug, t.logger)
1979
1980                         pieceHashedNotCorrect.Add(1)
1981                 }
1982         }
1983
1984         p.marking = true
1985         t.publishPieceChange(piece)
1986         defer func() {
1987                 p.marking = false
1988                 t.publishPieceChange(piece)
1989         }()
1990
1991         if passed {
1992                 if len(p.dirtiers) != 0 {
1993                         // Don't increment stats above connection-level for every involved connection.
1994                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1995                 }
1996                 for c := range p.dirtiers {
1997                         c._stats.incrementPiecesDirtiedGood()
1998                 }
1999                 t.clearPieceTouchers(piece)
2000                 t.cl.unlock()
2001                 err := p.Storage().MarkComplete()
2002                 if err != nil {
2003                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
2004                 }
2005                 t.cl.lock()
2006
2007                 if t.closed.IsSet() {
2008                         return
2009                 }
2010                 t.pendAllChunkSpecs(piece)
2011         } else {
2012                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2013                         // Peers contributed to all the data for this piece hash failure, and the failure was
2014                         // not due to errors in the storage (such as data being dropped in a cache).
2015
2016                         // Increment Torrent and above stats, and then specific connections.
2017                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2018                         for c := range p.dirtiers {
2019                                 // Y u do dis peer?!
2020                                 c.stats().incrementPiecesDirtiedBad()
2021                         }
2022
2023                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2024                         for c := range p.dirtiers {
2025                                 if !c.trusted {
2026                                         bannableTouchers = append(bannableTouchers, c)
2027                                 }
2028                         }
2029                         t.clearPieceTouchers(piece)
2030                         slices.Sort(bannableTouchers, connLessTrusted)
2031
2032                         if t.cl.config.Debug {
2033                                 t.logger.Printf(
2034                                         "bannable conns by trust for piece %d: %v",
2035                                         piece,
2036                                         func() (ret []connectionTrust) {
2037                                                 for _, c := range bannableTouchers {
2038                                                         ret = append(ret, c.trust())
2039                                                 }
2040                                                 return
2041                                         }(),
2042                                 )
2043                         }
2044
2045                         if len(bannableTouchers) >= 1 {
2046                                 c := bannableTouchers[0]
2047                                 if len(bannableTouchers) != 1 {
2048                                         t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2049                                 } else {
2050                                         // Turns out it's still useful to ban peers like this because if there's only a
2051                                         // single peer for a piece, and we never progress that piece to completion, we
2052                                         // will never smart-ban them. Discovered in
2053                                         // https://github.com/anacrolix/torrent/issues/715.
2054                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2055                                         c.ban()
2056                                 }
2057                         }
2058                 }
2059                 t.onIncompletePiece(piece)
2060                 p.Storage().MarkNotComplete()
2061         }
2062         t.updatePieceCompletion(piece)
2063 }
2064
2065 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2066         for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
2067                 t.cancelRequest(ri)
2068         }
2069 }
2070
2071 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2072         t.pendAllChunkSpecs(piece)
2073         t.cancelRequestsForPiece(piece)
2074         t.piece(piece).readerCond.Broadcast()
2075         for conn := range t.conns {
2076                 conn.have(piece)
2077                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
2078         }
2079 }
2080
2081 // Called when a piece is found to be not complete.
2082 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2083         if t.pieceAllDirty(piece) {
2084                 t.pendAllChunkSpecs(piece)
2085         }
2086         if !t.wantPieceIndex(piece) {
2087                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2088                 return
2089         }
2090         // We could drop any connections that we told we have a piece that we
2091         // don't here. But there's a test failure, and it seems clients don't care
2092         // if you request pieces that you already claim to have. Pruning bad
2093         // connections might just remove any connections that aren't treating us
2094         // favourably anyway.
2095
2096         // for c := range t.conns {
2097         //      if c.sentHave(piece) {
2098         //              c.drop()
2099         //      }
2100         // }
2101         t.iterPeers(func(conn *Peer) {
2102                 if conn.peerHasPiece(piece) {
2103                         conn.updateRequests("piece incomplete")
2104                 }
2105         })
2106 }
2107
2108 func (t *Torrent) tryCreateMorePieceHashers() {
2109         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2110         }
2111 }
2112
2113 func (t *Torrent) tryCreatePieceHasher() bool {
2114         if t.storage == nil {
2115                 return false
2116         }
2117         pi, ok := t.getPieceToHash()
2118         if !ok {
2119                 return false
2120         }
2121         p := t.piece(pi)
2122         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2123         p.hashing = true
2124         t.publishPieceChange(pi)
2125         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2126         t.storageLock.RLock()
2127         t.activePieceHashes++
2128         go t.pieceHasher(pi)
2129         return true
2130 }
2131
2132 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2133         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2134                 if t.piece(i).hashing {
2135                         return true
2136                 }
2137                 ret = i
2138                 ok = true
2139                 return false
2140         })
2141         return
2142 }
2143
2144 func (t *Torrent) dropBannedPeers() {
2145         t.iterPeers(func(p *Peer) {
2146                 remoteIp := p.remoteIp()
2147                 if remoteIp == nil {
2148                         if p.bannableAddr.Ok() {
2149                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2150                         }
2151                         return
2152                 }
2153                 netipAddr := netip.MustParseAddr(remoteIp.String())
2154                 if Some(netipAddr) != p.bannableAddr {
2155                         t.logger.WithDefaultLevel(log.Debug).Printf(
2156                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2157                                 p, remoteIp, p.bannableAddr)
2158                 }
2159                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2160                         // Should this be a close?
2161                         p.drop()
2162                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2163                 }
2164         })
2165 }
2166
2167 func (t *Torrent) pieceHasher(index pieceIndex) {
2168         p := t.piece(index)
2169         sum, failedPeers, copyErr := t.hashPiece(index)
2170         correct := sum == *p.hash
2171         switch copyErr {
2172         case nil, io.EOF:
2173         default:
2174                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2175         }
2176         t.storageLock.RUnlock()
2177         t.cl.lock()
2178         defer t.cl.unlock()
2179         if correct {
2180                 for peer := range failedPeers {
2181                         t.cl.banPeerIP(peer.AsSlice())
2182                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2183                 }
2184                 t.dropBannedPeers()
2185                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2186                         t.smartBanCache.ForgetBlock(ri)
2187                 }
2188         }
2189         p.hashing = false
2190         t.pieceHashed(index, correct, copyErr)
2191         t.updatePiecePriority(index, "Torrent.pieceHasher")
2192         t.activePieceHashes--
2193         t.tryCreateMorePieceHashers()
2194 }
2195
2196 // Return the connections that touched a piece, and clear the entries while doing it.
2197 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2198         p := t.piece(pi)
2199         for c := range p.dirtiers {
2200                 delete(c.peerTouchedPieces, pi)
2201                 delete(p.dirtiers, c)
2202         }
2203 }
2204
2205 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2206         t.iterPeers(func(p *Peer) {
2207                 ret = append(ret, p)
2208         })
2209         return
2210 }
2211
2212 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2213         piece := t.piece(pieceIndex)
2214         if piece.queuedForHash() {
2215                 return
2216         }
2217         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2218         t.publishPieceChange(pieceIndex)
2219         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2220         t.tryCreateMorePieceHashers()
2221 }
2222
2223 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2224 // before the Info is available.
2225 func (t *Torrent) VerifyData() {
2226         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2227                 t.Piece(i).VerifyData()
2228         }
2229 }
2230
2231 // Start the process of connecting to the given peer for the given torrent if appropriate.
2232 func (t *Torrent) initiateConn(peer PeerInfo) {
2233         if peer.Id == t.cl.peerID {
2234                 return
2235         }
2236         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2237                 return
2238         }
2239         addr := peer.Addr
2240         if t.addrActive(addr.String()) {
2241                 return
2242         }
2243         t.cl.numHalfOpen++
2244         t.halfOpen[addr.String()] = peer
2245         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2246 }
2247
2248 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2249 // quickly make one Client visible to the Torrent of another Client.
2250 func (t *Torrent) AddClientPeer(cl *Client) int {
2251         return t.AddPeers(func() (ps []PeerInfo) {
2252                 for _, la := range cl.ListenAddrs() {
2253                         ps = append(ps, PeerInfo{
2254                                 Addr:    la,
2255                                 Trusted: true,
2256                         })
2257                 }
2258                 return
2259         }())
2260 }
2261
2262 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2263 // connection.
2264 func (t *Torrent) allStats(f func(*ConnStats)) {
2265         f(&t.stats)
2266         f(&t.cl.stats)
2267 }
2268
2269 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2270         return t.pieces[i].hashing
2271 }
2272
2273 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2274         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2275 }
2276
2277 func (t *Torrent) dialTimeout() time.Duration {
2278         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2279 }
2280
2281 func (t *Torrent) piece(i int) *Piece {
2282         return &t.pieces[i]
2283 }
2284
2285 func (t *Torrent) onWriteChunkErr(err error) {
2286         if t.userOnWriteChunkErr != nil {
2287                 go t.userOnWriteChunkErr(err)
2288                 return
2289         }
2290         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2291         t.disallowDataDownloadLocked()
2292 }
2293
2294 func (t *Torrent) DisallowDataDownload() {
2295         t.disallowDataDownloadLocked()
2296 }
2297
2298 func (t *Torrent) disallowDataDownloadLocked() {
2299         t.dataDownloadDisallowed.Set()
2300 }
2301
2302 func (t *Torrent) AllowDataDownload() {
2303         t.dataDownloadDisallowed.Clear()
2304 }
2305
2306 // Enables uploading data, if it was disabled.
2307 func (t *Torrent) AllowDataUpload() {
2308         t.cl.lock()
2309         defer t.cl.unlock()
2310         t.dataUploadDisallowed = false
2311         for c := range t.conns {
2312                 c.updateRequests("allow data upload")
2313         }
2314 }
2315
2316 // Disables uploading data, if it was enabled.
2317 func (t *Torrent) DisallowDataUpload() {
2318         t.cl.lock()
2319         defer t.cl.unlock()
2320         t.dataUploadDisallowed = true
2321         for c := range t.conns {
2322                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2323                 c.updateRequests("disallow data upload")
2324         }
2325 }
2326
2327 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2328 // or if nil, a critical message is logged, and data download is disabled.
2329 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2330         t.cl.lock()
2331         defer t.cl.unlock()
2332         t.userOnWriteChunkErr = f
2333 }
2334
2335 func (t *Torrent) iterPeers(f func(p *Peer)) {
2336         for pc := range t.conns {
2337                 f(&pc.Peer)
2338         }
2339         for _, ws := range t.webSeeds {
2340                 f(ws)
2341         }
2342 }
2343
2344 func (t *Torrent) callbacks() *Callbacks {
2345         return &t.cl.config.Callbacks
2346 }
2347
2348 type AddWebSeedsOpt func(*webseed.Client)
2349
2350 // Sets the WebSeed trailing path escaper for a webseed.Client.
2351 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2352         return func(c *webseed.Client) {
2353                 c.PathEscaper = custom
2354         }
2355 }
2356
2357 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2358         t.cl.lock()
2359         defer t.cl.unlock()
2360         for _, u := range urls {
2361                 t.addWebSeed(u, opts...)
2362         }
2363 }
2364
2365 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2366         if t.cl.config.DisableWebseeds {
2367                 return
2368         }
2369         if _, ok := t.webSeeds[url]; ok {
2370                 return
2371         }
2372         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2373         // right away. This value should be some multiple of the number of connections to a host. I
2374         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2375         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2376         // "https://webtorrent.io/torrents/".
2377         const maxRequests = 16
2378         ws := webseedPeer{
2379                 peer: Peer{
2380                         t:                        t,
2381                         outgoing:                 true,
2382                         Network:                  "http",
2383                         reconciledHandshakeStats: true,
2384                         // This should affect how often we have to recompute requests for this peer. Note that
2385                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2386                         // requests mark more often, so recomputation is probably sooner than with regular peer
2387                         // conns. ~4x maxRequests would be about right.
2388                         PeerMaxRequests: 128,
2389                         // TODO: Set ban prefix?
2390                         RemoteAddr: remoteAddrFromUrl(url),
2391                         callbacks:  t.callbacks(),
2392                 },
2393                 client: webseed.Client{
2394                         HttpClient: t.cl.httpClient,
2395                         Url:        url,
2396                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2397                                 return &rateLimitedReader{
2398                                         l: t.cl.config.DownloadRateLimiter,
2399                                         r: r,
2400                                 }
2401                         },
2402                 },
2403                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2404                 maxRequests:    maxRequests,
2405         }
2406         ws.peer.initRequestState()
2407         for _, opt := range opts {
2408                 opt(&ws.client)
2409         }
2410         ws.peer.initUpdateRequestsTimer()
2411         ws.requesterCond.L = t.cl.locker()
2412         for i := 0; i < maxRequests; i += 1 {
2413                 go ws.requester(i)
2414         }
2415         for _, f := range t.callbacks().NewPeer {
2416                 f(&ws.peer)
2417         }
2418         ws.peer.logger = t.logger.WithContextValue(&ws)
2419         ws.peer.peerImpl = &ws
2420         if t.haveInfo() {
2421                 ws.onGotInfo(t.info)
2422         }
2423         t.webSeeds[url] = &ws.peer
2424 }
2425
2426 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2427         t.iterPeers(func(p1 *Peer) {
2428                 if p1 == p {
2429                         active = true
2430                 }
2431         })
2432         return
2433 }
2434
2435 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2436         index := t.pieceIndexOfRequestIndex(ri)
2437         return Request{
2438                 pp.Integer(index),
2439                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2440         }
2441 }
2442
2443 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2444         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2445 }
2446
2447 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2448         return RequestIndex(piece) * t.chunksPerRegularPiece()
2449 }
2450
2451 func (t *Torrent) updateComplete() {
2452         t.Complete.SetBool(t.haveAllPieces())
2453 }
2454
2455 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2456         p := t.pendingRequests[r]
2457         if p != nil {
2458                 p.cancel(r)
2459         }
2460         delete(t.pendingRequests, r)
2461         return p
2462 }
2463
2464 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2465         return t.pendingRequests[r]
2466 }
2467
2468 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2469         if t.connsWithAllPieces == nil {
2470                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2471         }
2472         t.connsWithAllPieces[p] = struct{}{}
2473 }
2474
2475 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2476         _, ok := t.connsWithAllPieces[p]
2477         delete(t.connsWithAllPieces, p)
2478         return ok
2479 }
2480
2481 func (t *Torrent) numActivePeers() int {
2482         return len(t.conns) + len(t.webSeeds)
2483 }
2484
2485 func (t *Torrent) hasStorageCap() bool {
2486         f := t.storage.Capacity
2487         if f == nil {
2488                 return false
2489         }
2490         _, ok := (*f)()
2491         return ok
2492 }
2493
2494 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2495         return pieceIndex(ri / t.chunksPerRegularPiece())
2496 }