]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Switch requestState to be a slice
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "net/netip"
12         "net/url"
13         "sort"
14         "strings"
15         "text/tabwriter"
16         "time"
17         "unsafe"
18
19         "github.com/RoaringBitmap/roaring"
20         "github.com/anacrolix/chansync"
21         "github.com/anacrolix/chansync/events"
22         "github.com/anacrolix/dht/v2"
23         . "github.com/anacrolix/generics"
24         "github.com/anacrolix/log"
25         "github.com/anacrolix/missinggo/perf"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/bitmap"
29         "github.com/anacrolix/missinggo/v2/pubsub"
30         "github.com/anacrolix/multiless"
31         "github.com/anacrolix/sync"
32         request_strategy "github.com/anacrolix/torrent/request-strategy"
33         typedRoaring "github.com/anacrolix/torrent/typed-roaring"
34         "github.com/davecgh/go-spew/spew"
35         "github.com/pion/datachannel"
36
37         "github.com/anacrolix/torrent/bencode"
38         "github.com/anacrolix/torrent/common"
39         "github.com/anacrolix/torrent/metainfo"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         "github.com/anacrolix/torrent/segments"
42         "github.com/anacrolix/torrent/storage"
43         "github.com/anacrolix/torrent/tracker"
44         "github.com/anacrolix/torrent/webseed"
45         "github.com/anacrolix/torrent/webtorrent"
46 )
47
48 // Maintains state of torrent within a Client. Many methods should not be called before the info is
49 // available, see .Info and .GotInfo.
50 type Torrent struct {
51         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
52         // alignment. See #262.
53         stats  ConnStats
54         cl     *Client
55         logger log.Logger
56
57         networkingEnabled      chansync.Flag
58         dataDownloadDisallowed chansync.Flag
59         dataUploadDisallowed   bool
60         userOnWriteChunkErr    func(error)
61
62         closed   chansync.SetOnce
63         infoHash metainfo.Hash
64         pieces   []Piece
65         // Values are the piece indices that changed.
66         pieceStateChanges pubsub.PubSub[PieceStateChange]
67         // The size of chunks to request from peers over the wire. This is
68         // normally 16KiB by convention these days.
69         chunkSize pp.Integer
70         chunkPool sync.Pool
71         // Total length of the torrent in bytes. Stored because it's not O(1) to
72         // get this from the info dict.
73         length *int64
74
75         // The storage to open when the info dict becomes available.
76         storageOpener *storage.Client
77         // Storage for torrent data.
78         storage *storage.Torrent
79         // Read-locked for using storage, and write-locked for Closing.
80         storageLock sync.RWMutex
81
82         // TODO: Only announce stuff is used?
83         metainfo metainfo.MetaInfo
84
85         // The info dict. nil if we don't have it (yet).
86         info      *metainfo.Info
87         fileIndex segments.Index
88         files     *[]*File
89
90         _chunksPerRegularPiece chunkIndexType
91
92         webSeeds map[string]*Peer
93         // Active peer connections, running message stream loops. TODO: Make this
94         // open (not-closed) connections only.
95         conns               map[*PeerConn]struct{}
96         maxEstablishedConns int
97         // Set of addrs to which we're attempting to connect. Connections are
98         // half-open until all handshakes are completed.
99         halfOpen map[string]PeerInfo
100
101         // Reserve of peers to connect to. A peer can be both here and in the
102         // active connections if were told about the peer after connecting with
103         // them. That encourages us to reconnect to peers that are well known in
104         // the swarm.
105         peers prioritizedPeers
106         // Whether we want to know to know more peers.
107         wantPeersEvent missinggo.Event
108         // An announcer for each tracker URL.
109         trackerAnnouncers map[string]torrentTrackerAnnouncer
110         // How many times we've initiated a DHT announce. TODO: Move into stats.
111         numDHTAnnounces int
112
113         // Name used if the info name isn't available. Should be cleared when the
114         // Info does become available.
115         nameMu      sync.RWMutex
116         displayName string
117
118         // The bencoded bytes of the info dict. This is actively manipulated if
119         // the info bytes aren't initially available, and we try to fetch them
120         // from peers.
121         metadataBytes []byte
122         // Each element corresponds to the 16KiB metadata pieces. If true, we have
123         // received that piece.
124         metadataCompletedChunks []bool
125         metadataChanged         sync.Cond
126
127         // Closed when .Info is obtained.
128         gotMetainfoC chan struct{}
129
130         readers                map[*reader]struct{}
131         _readerNowPieces       bitmap.Bitmap
132         _readerReadaheadPieces bitmap.Bitmap
133
134         // A cache of pieces we need to get. Calculated from various piece and
135         // file priorities and completion states elsewhere.
136         _pendingPieces roaring.Bitmap
137         // A cache of completed piece indices.
138         _completedPieces roaring.Bitmap
139         // Pieces that need to be hashed.
140         piecesQueuedForHash       bitmap.Bitmap
141         activePieceHashes         int
142         initialPieceCheckDisabled bool
143
144         connsWithAllPieces map[*Peer]struct{}
145
146         requestState []requestState
147         // Chunks we've written to since the corresponding piece was last checked.
148         dirtyChunks typedRoaring.Bitmap[RequestIndex]
149
150         pex pexState
151
152         // Is On when all pieces are complete.
153         Complete chansync.Flag
154
155         // Torrent sources in use keyed by the source string.
156         activeSources sync.Map
157         sourcesLogger log.Logger
158
159         smartBanCache smartBanCache
160 }
161
162 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
163         // This could be done with roaring.BitSliceIndexing.
164         t.iterPeers(func(peer *Peer) {
165                 if _, ok := t.connsWithAllPieces[peer]; ok {
166                         return
167                 }
168                 if peer.peerHasPiece(i) {
169                         count++
170                 }
171         })
172         return
173 }
174
175 func (t *Torrent) decPieceAvailability(i pieceIndex) {
176         if !t.haveInfo() {
177                 return
178         }
179         p := t.piece(i)
180         if p.relativeAvailability <= 0 {
181                 panic(p.relativeAvailability)
182         }
183         p.relativeAvailability--
184         t.updatePieceRequestOrder(i)
185 }
186
187 func (t *Torrent) incPieceAvailability(i pieceIndex) {
188         // If we don't the info, this should be reconciled when we do.
189         if t.haveInfo() {
190                 p := t.piece(i)
191                 p.relativeAvailability++
192                 t.updatePieceRequestOrder(i)
193         }
194 }
195
196 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
197         return t._readerNowPieces
198 }
199
200 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
201         return t._readerReadaheadPieces
202 }
203
204 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
205         return !t.wantPieceIndex(i)
206 }
207
208 // Returns a channel that is closed when the Torrent is closed.
209 func (t *Torrent) Closed() events.Done {
210         return t.closed.Done()
211 }
212
213 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
214 // pending, and half-open peers.
215 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
216         // Add pending peers to the list
217         t.peers.Each(func(peer PeerInfo) {
218                 ks = append(ks, peer)
219         })
220
221         // Add half-open peers to the list
222         for _, peer := range t.halfOpen {
223                 ks = append(ks, peer)
224         }
225
226         // Add active peers to the list
227         for conn := range t.conns {
228                 ks = append(ks, PeerInfo{
229                         Id:     conn.PeerID,
230                         Addr:   conn.RemoteAddr,
231                         Source: conn.Discovery,
232                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
233                         // > But if we're not connected to them with an encrypted connection, I couldn't say
234                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
235                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
236                         // > connection. It's probably easiest to do the latter for now.
237                         // https://github.com/anacrolix/torrent/pull/188
238                         SupportsEncryption: conn.headerEncrypted,
239                 })
240         }
241
242         return
243 }
244
245 func (t *Torrent) setChunkSize(size pp.Integer) {
246         t.chunkSize = size
247         t.chunkPool = sync.Pool{
248                 New: func() interface{} {
249                         b := make([]byte, size)
250                         return &b
251                 },
252         }
253 }
254
255 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
256         return t._completedPieces.Contains(bitmap.BitIndex(piece))
257 }
258
259 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
260         if t.storage == nil {
261                 return storage.Completion{Complete: false, Ok: true}
262         }
263         return t.pieces[piece].Storage().Completion()
264 }
265
266 // There's a connection to that address already.
267 func (t *Torrent) addrActive(addr string) bool {
268         if _, ok := t.halfOpen[addr]; ok {
269                 return true
270         }
271         for c := range t.conns {
272                 ra := c.RemoteAddr
273                 if ra.String() == addr {
274                         return true
275                 }
276         }
277         return false
278 }
279
280 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
281         return t.appendConns(ret, func(conn *PeerConn) bool {
282                 return !conn.closed.IsSet()
283         })
284 }
285
286 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
287         for c := range t.conns {
288                 if f(c) {
289                         ret = append(ret, c)
290                 }
291         }
292         return ret
293 }
294
295 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
296         cl := t.cl
297         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
298         if t.closed.IsSet() {
299                 return false
300         }
301         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
302                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
303                         torrent.Add("peers not added because of bad addr", 1)
304                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
305                         return false
306                 }
307         }
308         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
309                 torrent.Add("peers replaced", 1)
310                 if !replaced.equal(p) {
311                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
312                         added = true
313                 }
314         } else {
315                 added = true
316         }
317         t.openNewConns()
318         for t.peers.Len() > cl.config.TorrentPeersHighWater {
319                 _, ok := t.peers.DeleteMin()
320                 if ok {
321                         torrent.Add("excess reserve peers discarded", 1)
322                 }
323         }
324         return
325 }
326
327 func (t *Torrent) invalidateMetadata() {
328         for i := 0; i < len(t.metadataCompletedChunks); i++ {
329                 t.metadataCompletedChunks[i] = false
330         }
331         t.nameMu.Lock()
332         t.gotMetainfoC = make(chan struct{})
333         t.info = nil
334         t.nameMu.Unlock()
335 }
336
337 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
338         if t.haveInfo() {
339                 return
340         }
341         if index >= len(t.metadataCompletedChunks) {
342                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
343                 return
344         }
345         copy(t.metadataBytes[(1<<14)*index:], data)
346         t.metadataCompletedChunks[index] = true
347 }
348
349 func (t *Torrent) metadataPieceCount() int {
350         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
351 }
352
353 func (t *Torrent) haveMetadataPiece(piece int) bool {
354         if t.haveInfo() {
355                 return (1<<14)*piece < len(t.metadataBytes)
356         } else {
357                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
358         }
359 }
360
361 func (t *Torrent) metadataSize() int {
362         return len(t.metadataBytes)
363 }
364
365 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
366         for i := 0; i < len(info.Pieces); i += sha1.Size {
367                 ret = append(ret, info.Pieces[i:i+sha1.Size])
368         }
369         return
370 }
371
372 func (t *Torrent) makePieces() {
373         hashes := infoPieceHashes(t.info)
374         t.pieces = make([]Piece, len(hashes))
375         for i, hash := range hashes {
376                 piece := &t.pieces[i]
377                 piece.t = t
378                 piece.index = pieceIndex(i)
379                 piece.noPendingWrites.L = &piece.pendingWritesMutex
380                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
381                 files := *t.files
382                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
383                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
384                 piece.files = files[beginFile:endFile]
385                 piece.undirtiedChunksIter = undirtiedChunksIter{
386                         TorrentDirtyChunks: &t.dirtyChunks,
387                         StartRequestIndex:  piece.requestIndexOffset(),
388                         EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
389                 }
390         }
391 }
392
393 // Returns the index of the first file containing the piece. files must be
394 // ordered by offset.
395 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
396         for i, f := range files {
397                 if f.offset+f.length > pieceOffset {
398                         return i
399                 }
400         }
401         return 0
402 }
403
404 // Returns the index after the last file containing the piece. files must be
405 // ordered by offset.
406 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
407         for i, f := range files {
408                 if f.offset+f.length >= pieceEndOffset {
409                         return i + 1
410                 }
411         }
412         return 0
413 }
414
415 func (t *Torrent) cacheLength() {
416         var l int64
417         for _, f := range t.info.UpvertedFiles() {
418                 l += f.Length
419         }
420         t.length = &l
421 }
422
423 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
424 // separately.
425 func (t *Torrent) setInfo(info *metainfo.Info) error {
426         if err := validateInfo(info); err != nil {
427                 return fmt.Errorf("bad info: %s", err)
428         }
429         if t.storageOpener != nil {
430                 var err error
431                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
432                 if err != nil {
433                         return fmt.Errorf("error opening torrent storage: %s", err)
434                 }
435         }
436         t.nameMu.Lock()
437         t.info = info
438         t.nameMu.Unlock()
439         t._chunksPerRegularPiece = chunkIndexType((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
440         t.updateComplete()
441         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
442         t.displayName = "" // Save a few bytes lol.
443         t.initFiles()
444         t.cacheLength()
445         t.makePieces()
446         return nil
447 }
448
449 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
450         return request_strategy.PieceRequestOrderKey{
451                 InfoHash: t.infoHash,
452                 Index:    i,
453         }
454 }
455
456 // This seems to be all the follow-up tasks after info is set, that can't fail.
457 func (t *Torrent) onSetInfo() {
458         t.initPieceRequestOrder()
459         MakeSliceWithLength(&t.requestState, t.numChunks())
460         for i := range t.pieces {
461                 p := &t.pieces[i]
462                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
463                 // being dropped.
464                 if p.relativeAvailability != 0 {
465                         panic(p.relativeAvailability)
466                 }
467                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
468                 t.addRequestOrderPiece(i)
469                 t.updatePieceCompletion(i)
470                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
471                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
472                         t.queuePieceCheck(i)
473                 }
474         }
475         t.cl.event.Broadcast()
476         close(t.gotMetainfoC)
477         t.updateWantPeersEvent()
478         t.tryCreateMorePieceHashers()
479         t.iterPeers(func(p *Peer) {
480                 p.onGotInfo(t.info)
481                 p.updateRequests("onSetInfo")
482         })
483 }
484
485 // Called when metadata for a torrent becomes available.
486 func (t *Torrent) setInfoBytesLocked(b []byte) error {
487         if metainfo.HashBytes(b) != t.infoHash {
488                 return errors.New("info bytes have wrong hash")
489         }
490         var info metainfo.Info
491         if err := bencode.Unmarshal(b, &info); err != nil {
492                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
493         }
494         t.metadataBytes = b
495         t.metadataCompletedChunks = nil
496         if t.info != nil {
497                 return nil
498         }
499         if err := t.setInfo(&info); err != nil {
500                 return err
501         }
502         t.onSetInfo()
503         return nil
504 }
505
506 func (t *Torrent) haveAllMetadataPieces() bool {
507         if t.haveInfo() {
508                 return true
509         }
510         if t.metadataCompletedChunks == nil {
511                 return false
512         }
513         for _, have := range t.metadataCompletedChunks {
514                 if !have {
515                         return false
516                 }
517         }
518         return true
519 }
520
521 // TODO: Propagate errors to disconnect peer.
522 func (t *Torrent) setMetadataSize(size int) (err error) {
523         if t.haveInfo() {
524                 // We already know the correct metadata size.
525                 return
526         }
527         if uint32(size) > maxMetadataSize {
528                 return errors.New("bad size")
529         }
530         if len(t.metadataBytes) == size {
531                 return
532         }
533         t.metadataBytes = make([]byte, size)
534         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
535         t.metadataChanged.Broadcast()
536         for c := range t.conns {
537                 c.requestPendingMetadata()
538         }
539         return
540 }
541
542 // The current working name for the torrent. Either the name in the info dict,
543 // or a display name given such as by the dn value in a magnet link, or "".
544 func (t *Torrent) name() string {
545         t.nameMu.RLock()
546         defer t.nameMu.RUnlock()
547         if t.haveInfo() {
548                 return t.info.BestName()
549         }
550         if t.displayName != "" {
551                 return t.displayName
552         }
553         return "infohash:" + t.infoHash.HexString()
554 }
555
556 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
557         p := &t.pieces[index]
558         ret.Priority = t.piecePriority(index)
559         ret.Completion = p.completion()
560         ret.QueuedForHash = p.queuedForHash()
561         ret.Hashing = p.hashing
562         ret.Checking = ret.QueuedForHash || ret.Hashing
563         ret.Marking = p.marking
564         if !ret.Complete && t.piecePartiallyDownloaded(index) {
565                 ret.Partial = true
566         }
567         return
568 }
569
570 func (t *Torrent) metadataPieceSize(piece int) int {
571         return metadataPieceSize(len(t.metadataBytes), piece)
572 }
573
574 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
575         return pp.Message{
576                 Type:       pp.Extended,
577                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
578                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
579                         Piece:     piece,
580                         TotalSize: len(t.metadataBytes),
581                         Type:      msgType,
582                 }), data...),
583         }
584 }
585
586 type pieceAvailabilityRun struct {
587         Count        pieceIndex
588         Availability int
589 }
590
591 func (me pieceAvailabilityRun) String() string {
592         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
593 }
594
595 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
596         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
597                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
598         })
599         for i := range t.pieces {
600                 rle.Append(t.pieces[i].availability(), 1)
601         }
602         rle.Flush()
603         return
604 }
605
606 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
607         freqs = make([]int, t.numActivePeers()+1)
608         for i := range t.pieces {
609                 freqs[t.piece(i).availability()]++
610         }
611         return
612 }
613
614 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
615         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
616                 ret = append(ret, PieceStateRun{
617                         PieceState: el.(PieceState),
618                         Length:     int(count),
619                 })
620         })
621         for index := range t.pieces {
622                 rle.Append(t.pieceState(pieceIndex(index)), 1)
623         }
624         rle.Flush()
625         return
626 }
627
628 // Produces a small string representing a PieceStateRun.
629 func (psr PieceStateRun) String() (ret string) {
630         ret = fmt.Sprintf("%d", psr.Length)
631         ret += func() string {
632                 switch psr.Priority {
633                 case PiecePriorityNext:
634                         return "N"
635                 case PiecePriorityNormal:
636                         return "."
637                 case PiecePriorityReadahead:
638                         return "R"
639                 case PiecePriorityNow:
640                         return "!"
641                 case PiecePriorityHigh:
642                         return "H"
643                 default:
644                         return ""
645                 }
646         }()
647         if psr.Hashing {
648                 ret += "H"
649         }
650         if psr.QueuedForHash {
651                 ret += "Q"
652         }
653         if psr.Marking {
654                 ret += "M"
655         }
656         if psr.Partial {
657                 ret += "P"
658         }
659         if psr.Complete {
660                 ret += "C"
661         }
662         if !psr.Ok {
663                 ret += "?"
664         }
665         return
666 }
667
668 func (t *Torrent) writeStatus(w io.Writer) {
669         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
670         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
671         if !t.haveInfo() {
672                 fmt.Fprintf(w, "Metadata have: ")
673                 for _, h := range t.metadataCompletedChunks {
674                         fmt.Fprintf(w, "%c", func() rune {
675                                 if h {
676                                         return 'H'
677                                 } else {
678                                         return '.'
679                                 }
680                         }())
681                 }
682                 fmt.Fprintln(w)
683         }
684         fmt.Fprintf(w, "Piece length: %s\n",
685                 func() string {
686                         if t.haveInfo() {
687                                 return fmt.Sprintf("%v (%v chunks)",
688                                         t.usualPieceSize(),
689                                         float64(t.usualPieceSize())/float64(t.chunkSize))
690                         } else {
691                                 return "no info"
692                         }
693                 }(),
694         )
695         if t.info != nil {
696                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
697                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
698                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
699                 // availability frequencies instead.
700                 if false {
701                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
702                                 for _, run := range t.pieceAvailabilityRuns() {
703                                         ret = append(ret, run.String())
704                                 }
705                                 return
706                         }(), " "))
707                 }
708                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
709                         func() (ret []string) {
710                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
711                                         if freq == 0 {
712                                                 continue
713                                         }
714                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
715                                 }
716                                 return
717                         }(),
718                         ", "))
719         }
720         fmt.Fprintf(w, "Reader Pieces:")
721         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
722                 fmt.Fprintf(w, " %d:%d", begin, end)
723                 return true
724         })
725         fmt.Fprintln(w)
726
727         fmt.Fprintf(w, "Enabled trackers:\n")
728         func() {
729                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
730                 fmt.Fprintf(tw, "    URL\tExtra\n")
731                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
732                         lu := l.URL()
733                         ru := r.URL()
734                         var luns, runs url.URL = *lu, *ru
735                         luns.Scheme = ""
736                         runs.Scheme = ""
737                         var ml missinggo.MultiLess
738                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
739                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
740                         return ml.Less()
741                 }).([]torrentTrackerAnnouncer) {
742                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
743                 }
744                 tw.Flush()
745         }()
746
747         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
748
749         spew.NewDefaultConfig()
750         spew.Fdump(w, t.statsLocked())
751
752         peers := t.peersAsSlice()
753         sort.Slice(peers, func(_i, _j int) bool {
754                 i := peers[_i]
755                 j := peers[_j]
756                 if less, ok := multiless.New().EagerSameLess(
757                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
758                 ).LessOk(); ok {
759                         return less
760                 }
761                 return worseConn(i, j)
762         })
763         for i, c := range peers {
764                 fmt.Fprintf(w, "%2d. ", i+1)
765                 c.writeStatus(w, t)
766         }
767 }
768
769 func (t *Torrent) haveInfo() bool {
770         return t.info != nil
771 }
772
773 // Returns a run-time generated MetaInfo that includes the info bytes and
774 // announce-list as currently known to the client.
775 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
776         return metainfo.MetaInfo{
777                 CreationDate: time.Now().Unix(),
778                 Comment:      "dynamic metainfo from client",
779                 CreatedBy:    "go.torrent",
780                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
781                 InfoBytes: func() []byte {
782                         if t.haveInfo() {
783                                 return t.metadataBytes
784                         } else {
785                                 return nil
786                         }
787                 }(),
788                 UrlList: func() []string {
789                         ret := make([]string, 0, len(t.webSeeds))
790                         for url := range t.webSeeds {
791                                 ret = append(ret, url)
792                         }
793                         return ret
794                 }(),
795         }
796 }
797
798 // Get bytes left
799 func (t *Torrent) BytesMissing() (n int64) {
800         t.cl.rLock()
801         n = t.bytesMissingLocked()
802         t.cl.rUnlock()
803         return
804 }
805
806 func (t *Torrent) bytesMissingLocked() int64 {
807         return t.bytesLeft()
808 }
809
810 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
811         roaring.Flip(b, 0, end).Iterate(cb)
812 }
813
814 func (t *Torrent) bytesLeft() (left int64) {
815         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
816                 p := t.piece(pieceIndex(x))
817                 left += int64(p.length() - p.numDirtyBytes())
818                 return true
819         })
820         return
821 }
822
823 // Bytes left to give in tracker announces.
824 func (t *Torrent) bytesLeftAnnounce() int64 {
825         if t.haveInfo() {
826                 return t.bytesLeft()
827         } else {
828                 return -1
829         }
830 }
831
832 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
833         if t.pieceComplete(piece) {
834                 return false
835         }
836         if t.pieceAllDirty(piece) {
837                 return false
838         }
839         return t.pieces[piece].hasDirtyChunks()
840 }
841
842 func (t *Torrent) usualPieceSize() int {
843         return int(t.info.PieceLength)
844 }
845
846 func (t *Torrent) numPieces() pieceIndex {
847         return pieceIndex(t.info.NumPieces())
848 }
849
850 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
851         return pieceIndex(t._completedPieces.GetCardinality())
852 }
853
854 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
855         if !t.closed.Set() {
856                 err = errors.New("already closed")
857                 return
858         }
859         if t.storage != nil {
860                 wg.Add(1)
861                 go func() {
862                         defer wg.Done()
863                         t.storageLock.Lock()
864                         defer t.storageLock.Unlock()
865                         if f := t.storage.Close; f != nil {
866                                 err1 := f()
867                                 if err1 != nil {
868                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
869                                 }
870                         }
871                 }()
872         }
873         t.iterPeers(func(p *Peer) {
874                 p.close()
875         })
876         if t.storage != nil {
877                 t.deletePieceRequestOrder()
878         }
879         for i := range t.pieces {
880                 p := t.piece(i)
881                 if p.relativeAvailability != 0 {
882                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
883                 }
884         }
885         t.pex.Reset()
886         t.cl.event.Broadcast()
887         t.pieceStateChanges.Close()
888         t.updateWantPeersEvent()
889         return
890 }
891
892 func (t *Torrent) requestOffset(r Request) int64 {
893         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
894 }
895
896 // Return the request that would include the given offset into the torrent data. Returns !ok if
897 // there is no such request.
898 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
899         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
900 }
901
902 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
903         defer perf.ScopeTimerErr(&err)()
904         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
905         if err == nil && n != len(data) {
906                 err = io.ErrShortWrite
907         }
908         return err
909 }
910
911 func (t *Torrent) bitfield() (bf []bool) {
912         bf = make([]bool, t.numPieces())
913         t._completedPieces.Iterate(func(piece uint32) (again bool) {
914                 bf[piece] = true
915                 return true
916         })
917         return
918 }
919
920 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
921         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
922 }
923
924 func (t *Torrent) chunksPerRegularPiece() chunkIndexType {
925         return t._chunksPerRegularPiece
926 }
927
928 func (t *Torrent) numChunks() RequestIndex {
929         if t.numPieces() == 0 {
930                 return 0
931         }
932         return RequestIndex(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
933 }
934
935 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
936         t.dirtyChunks.RemoveRange(
937                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
938                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
939 }
940
941 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
942         if t.info.PieceLength == 0 {
943                 // There will be no variance amongst pieces. Only pain.
944                 return 0
945         }
946         if piece == t.numPieces()-1 {
947                 ret := pp.Integer(*t.length % t.info.PieceLength)
948                 if ret != 0 {
949                         return ret
950                 }
951         }
952         return pp.Integer(t.info.PieceLength)
953 }
954
955 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
956         return &blockCheckingWriter{
957                 cache:        &t.smartBanCache,
958                 requestIndex: t.pieceRequestIndexOffset(piece),
959                 chunkSize:    t.chunkSize.Int(),
960         }
961 }
962
963 func (t *Torrent) hashPiece(piece pieceIndex) (
964         ret metainfo.Hash,
965         // These are peers that sent us blocks that differ from what we hash here.
966         differingPeers map[bannableAddr]struct{},
967         err error,
968 ) {
969         p := t.piece(piece)
970         p.waitNoPendingWrites()
971         storagePiece := t.pieces[piece].Storage()
972
973         // Does the backend want to do its own hashing?
974         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
975                 var sum metainfo.Hash
976                 // log.Printf("A piece decided to self-hash: %d", piece)
977                 sum, err = i.SelfHash()
978                 missinggo.CopyExact(&ret, sum)
979                 return
980         }
981
982         hash := pieceHash.New()
983         const logPieceContents = false
984         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
985         writers := []io.Writer{hash, smartBanWriter}
986         var examineBuf bytes.Buffer
987         if logPieceContents {
988                 writers = append(writers, &examineBuf)
989         }
990         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
991         if logPieceContents {
992                 t.logger.WithDefaultLevel(log.Debug).Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
993         }
994         smartBanWriter.Flush()
995         differingPeers = smartBanWriter.badPeers
996         missinggo.CopyExact(&ret, hash.Sum(nil))
997         return
998 }
999
1000 func (t *Torrent) haveAnyPieces() bool {
1001         return !t._completedPieces.IsEmpty()
1002 }
1003
1004 func (t *Torrent) haveAllPieces() bool {
1005         if !t.haveInfo() {
1006                 return false
1007         }
1008         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1009 }
1010
1011 func (t *Torrent) havePiece(index pieceIndex) bool {
1012         return t.haveInfo() && t.pieceComplete(index)
1013 }
1014
1015 func (t *Torrent) maybeDropMutuallyCompletePeer(
1016         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1017         // okay?
1018         p *Peer,
1019 ) {
1020         if !t.cl.config.DropMutuallyCompletePeers {
1021                 return
1022         }
1023         if !t.haveAllPieces() {
1024                 return
1025         }
1026         if all, known := p.peerHasAllPieces(); !(known && all) {
1027                 return
1028         }
1029         if p.useful() {
1030                 return
1031         }
1032         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
1033         p.drop()
1034 }
1035
1036 func (t *Torrent) haveChunk(r Request) (ret bool) {
1037         // defer func() {
1038         //      log.Println("have chunk", r, ret)
1039         // }()
1040         if !t.haveInfo() {
1041                 return false
1042         }
1043         if t.pieceComplete(pieceIndex(r.Index)) {
1044                 return true
1045         }
1046         p := &t.pieces[r.Index]
1047         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1048 }
1049
1050 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1051         return chunkIndexType(cs.Begin / chunkSize)
1052 }
1053
1054 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1055         return t._pendingPieces.Contains(uint32(index))
1056 }
1057
1058 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1059 // conns (which is a map).
1060 var peerConnSlices sync.Pool
1061
1062 func getPeerConnSlice(cap int) []*PeerConn {
1063         getInterface := peerConnSlices.Get()
1064         if getInterface == nil {
1065                 return make([]*PeerConn, 0, cap)
1066         } else {
1067                 return getInterface.([]*PeerConn)[:0]
1068         }
1069 }
1070
1071 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1072 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1073 // established connections for more than a minute. This is O(n log n). If there was a way to not
1074 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1075 func (t *Torrent) worstBadConn() (ret *PeerConn) {
1076         wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
1077         defer peerConnSlices.Put(wcs.conns)
1078         wcs.initKeys()
1079         heap.Init(&wcs)
1080         for wcs.Len() != 0 {
1081                 c := heap.Pop(&wcs).(*PeerConn)
1082                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1083                         return c
1084                 }
1085                 // If the connection is in the worst half of the established
1086                 // connection quota and is older than a minute.
1087                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1088                         // Give connections 1 minute to prove themselves.
1089                         if time.Since(c.completedHandshake) > time.Minute {
1090                                 return c
1091                         }
1092                 }
1093         }
1094         return nil
1095 }
1096
1097 type PieceStateChange struct {
1098         Index int
1099         PieceState
1100 }
1101
1102 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1103         t.cl._mu.Defer(func() {
1104                 cur := t.pieceState(piece)
1105                 p := &t.pieces[piece]
1106                 if cur != p.publicPieceState {
1107                         p.publicPieceState = cur
1108                         t.pieceStateChanges.Publish(PieceStateChange{
1109                                 int(piece),
1110                                 cur,
1111                         })
1112                 }
1113         })
1114 }
1115
1116 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1117         if t.pieceComplete(piece) {
1118                 return 0
1119         }
1120         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1121 }
1122
1123 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1124         return t.pieces[piece].allChunksDirty()
1125 }
1126
1127 func (t *Torrent) readersChanged() {
1128         t.updateReaderPieces()
1129         t.updateAllPiecePriorities("Torrent.readersChanged")
1130 }
1131
1132 func (t *Torrent) updateReaderPieces() {
1133         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1134 }
1135
1136 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1137         if from == to {
1138                 return
1139         }
1140         t.updateReaderPieces()
1141         // Order the ranges, high and low.
1142         l, h := from, to
1143         if l.begin > h.begin {
1144                 l, h = h, l
1145         }
1146         if l.end < h.begin {
1147                 // Two distinct ranges.
1148                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1149                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1150         } else {
1151                 // Ranges overlap.
1152                 end := l.end
1153                 if h.end > end {
1154                         end = h.end
1155                 }
1156                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1157         }
1158 }
1159
1160 func (t *Torrent) maybeNewConns() {
1161         // Tickle the accept routine.
1162         t.cl.event.Broadcast()
1163         t.openNewConns()
1164 }
1165
1166 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1167         if t._pendingPieces.Contains(uint32(piece)) {
1168                 t.iterPeers(func(c *Peer) {
1169                         // if c.requestState.Interested {
1170                         //      return
1171                         // }
1172                         if !c.isLowOnRequests() {
1173                                 return
1174                         }
1175                         if !c.peerHasPiece(piece) {
1176                                 return
1177                         }
1178                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(piece) {
1179                                 return
1180                         }
1181                         c.updateRequests(reason)
1182                 })
1183         }
1184         t.maybeNewConns()
1185         t.publishPieceChange(piece)
1186 }
1187
1188 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1189         if !t.closed.IsSet() {
1190                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1191                 // request order.
1192                 t.updatePieceRequestOrder(piece)
1193         }
1194         p := &t.pieces[piece]
1195         newPrio := p.uncachedPriority()
1196         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1197         if newPrio == PiecePriorityNone {
1198                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1199                         return
1200                 }
1201         } else {
1202                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1203                         return
1204                 }
1205         }
1206         t.piecePriorityChanged(piece, reason)
1207 }
1208
1209 func (t *Torrent) updateAllPiecePriorities(reason string) {
1210         t.updatePiecePriorities(0, t.numPieces(), reason)
1211 }
1212
1213 // Update all piece priorities in one hit. This function should have the same
1214 // output as updatePiecePriority, but across all pieces.
1215 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1216         for i := begin; i < end; i++ {
1217                 t.updatePiecePriority(i, reason)
1218         }
1219 }
1220
1221 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1222 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1223         if off >= *t.length {
1224                 return
1225         }
1226         if off < 0 {
1227                 size += off
1228                 off = 0
1229         }
1230         if size <= 0 {
1231                 return
1232         }
1233         begin = pieceIndex(off / t.info.PieceLength)
1234         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1235         if end > pieceIndex(t.info.NumPieces()) {
1236                 end = pieceIndex(t.info.NumPieces())
1237         }
1238         return
1239 }
1240
1241 // Returns true if all iterations complete without breaking. Returns the read regions for all
1242 // readers. The reader regions should not be merged as some callers depend on this method to
1243 // enumerate readers.
1244 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1245         for r := range t.readers {
1246                 p := r.pieces
1247                 if p.begin >= p.end {
1248                         continue
1249                 }
1250                 if !f(p.begin, p.end) {
1251                         return false
1252                 }
1253         }
1254         return true
1255 }
1256
1257 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1258         return t.piece(piece).uncachedPriority()
1259 }
1260
1261 func (t *Torrent) pendRequest(req RequestIndex) {
1262         t.piece(t.pieceIndexOfRequestIndex(req)).pendChunkIndex(req % t.chunksPerRegularPiece())
1263 }
1264
1265 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1266         t.cl.event.Broadcast()
1267         if t.pieceComplete(piece) {
1268                 t.onPieceCompleted(piece)
1269         } else {
1270                 t.onIncompletePiece(piece)
1271         }
1272         t.updatePiecePriority(piece, reason)
1273 }
1274
1275 func (t *Torrent) numReceivedConns() (ret int) {
1276         for c := range t.conns {
1277                 if c.Discovery == PeerSourceIncoming {
1278                         ret++
1279                 }
1280         }
1281         return
1282 }
1283
1284 func (t *Torrent) maxHalfOpen() int {
1285         // Note that if we somehow exceed the maximum established conns, we want
1286         // the negative value to have an effect.
1287         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1288         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1289         // We want to allow some experimentation with new peers, and to try to
1290         // upset an oversupply of received connections.
1291         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1292 }
1293
1294 func (t *Torrent) openNewConns() (initiated int) {
1295         defer t.updateWantPeersEvent()
1296         for t.peers.Len() != 0 {
1297                 if !t.wantConns() {
1298                         return
1299                 }
1300                 if len(t.halfOpen) >= t.maxHalfOpen() {
1301                         return
1302                 }
1303                 if len(t.cl.dialers) == 0 {
1304                         return
1305                 }
1306                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1307                         return
1308                 }
1309                 p := t.peers.PopMax()
1310                 t.initiateConn(p)
1311                 initiated++
1312         }
1313         return
1314 }
1315
1316 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1317         p := t.piece(piece)
1318         uncached := t.pieceCompleteUncached(piece)
1319         cached := p.completion()
1320         changed := cached != uncached
1321         complete := uncached.Complete
1322         p.storageCompletionOk = uncached.Ok
1323         x := uint32(piece)
1324         if complete {
1325                 t._completedPieces.Add(x)
1326                 t.openNewConns()
1327         } else {
1328                 t._completedPieces.Remove(x)
1329         }
1330         p.t.updatePieceRequestOrder(piece)
1331         t.updateComplete()
1332         if complete && len(p.dirtiers) != 0 {
1333                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1334         }
1335         if changed {
1336                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
1337                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1338         }
1339         return changed
1340 }
1341
1342 // Non-blocking read. Client lock is not required.
1343 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1344         for len(b) != 0 {
1345                 p := &t.pieces[off/t.info.PieceLength]
1346                 p.waitNoPendingWrites()
1347                 var n1 int
1348                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1349                 if n1 == 0 {
1350                         break
1351                 }
1352                 off += int64(n1)
1353                 n += n1
1354                 b = b[n1:]
1355         }
1356         return
1357 }
1358
1359 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1360 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1361 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1362 func (t *Torrent) maybeCompleteMetadata() error {
1363         if t.haveInfo() {
1364                 // Nothing to do.
1365                 return nil
1366         }
1367         if !t.haveAllMetadataPieces() {
1368                 // Don't have enough metadata pieces.
1369                 return nil
1370         }
1371         err := t.setInfoBytesLocked(t.metadataBytes)
1372         if err != nil {
1373                 t.invalidateMetadata()
1374                 return fmt.Errorf("error setting info bytes: %s", err)
1375         }
1376         if t.cl.config.Debug {
1377                 t.logger.Printf("%s: got metadata from peers", t)
1378         }
1379         return nil
1380 }
1381
1382 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1383         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1384                 if end > begin {
1385                         now.Add(bitmap.BitIndex(begin))
1386                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1387                 }
1388                 return true
1389         })
1390         return
1391 }
1392
1393 func (t *Torrent) needData() bool {
1394         if t.closed.IsSet() {
1395                 return false
1396         }
1397         if !t.haveInfo() {
1398                 return true
1399         }
1400         return !t._pendingPieces.IsEmpty()
1401 }
1402
1403 func appendMissingStrings(old, new []string) (ret []string) {
1404         ret = old
1405 new:
1406         for _, n := range new {
1407                 for _, o := range old {
1408                         if o == n {
1409                                 continue new
1410                         }
1411                 }
1412                 ret = append(ret, n)
1413         }
1414         return
1415 }
1416
1417 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1418         ret = existing
1419         for minNumTiers > len(ret) {
1420                 ret = append(ret, nil)
1421         }
1422         return
1423 }
1424
1425 func (t *Torrent) addTrackers(announceList [][]string) {
1426         fullAnnounceList := &t.metainfo.AnnounceList
1427         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1428         for tierIndex, trackerURLs := range announceList {
1429                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1430         }
1431         t.startMissingTrackerScrapers()
1432         t.updateWantPeersEvent()
1433 }
1434
1435 // Don't call this before the info is available.
1436 func (t *Torrent) bytesCompleted() int64 {
1437         if !t.haveInfo() {
1438                 return 0
1439         }
1440         return *t.length - t.bytesLeft()
1441 }
1442
1443 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1444         t.cl.lock()
1445         defer t.cl.unlock()
1446         return t.setInfoBytesLocked(b)
1447 }
1448
1449 // Returns true if connection is removed from torrent.Conns.
1450 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1451         if !c.closed.IsSet() {
1452                 panic("connection is not closed")
1453                 // There are behaviours prevented by the closed state that will fail
1454                 // if the connection has been deleted.
1455         }
1456         _, ret = t.conns[c]
1457         delete(t.conns, c)
1458         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1459         // the drop event against the PexConnState instead.
1460         if ret {
1461                 if !t.cl.config.DisablePEX {
1462                         t.pex.Drop(c)
1463                 }
1464         }
1465         torrent.Add("deleted connections", 1)
1466         c.deleteAllRequests("Torrent.deletePeerConn")
1467         t.assertPendingRequests()
1468         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1469                 panic(t.connsWithAllPieces)
1470         }
1471         return
1472 }
1473
1474 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1475         if t.deleteConnWithAllPieces(p) {
1476                 return
1477         }
1478         if !t.haveInfo() {
1479                 return
1480         }
1481         p.peerPieces().Iterate(func(i uint32) bool {
1482                 p.t.decPieceAvailability(pieceIndex(i))
1483                 return true
1484         })
1485 }
1486
1487 func (t *Torrent) assertPendingRequests() {
1488         if !check {
1489                 return
1490         }
1491         // var actual pendingRequests
1492         // if t.haveInfo() {
1493         //      actual.m = make([]int, t.numChunks())
1494         // }
1495         // t.iterPeers(func(p *Peer) {
1496         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1497         //              actual.Inc(x)
1498         //              return true
1499         //      })
1500         // })
1501         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1502         // if diff != "" {
1503         //      panic(diff)
1504         // }
1505 }
1506
1507 func (t *Torrent) dropConnection(c *PeerConn) {
1508         t.cl.event.Broadcast()
1509         c.close()
1510         if t.deletePeerConn(c) {
1511                 t.openNewConns()
1512         }
1513 }
1514
1515 // Peers as in contact information for dialing out.
1516 func (t *Torrent) wantPeers() bool {
1517         if t.closed.IsSet() {
1518                 return false
1519         }
1520         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1521                 return false
1522         }
1523         return t.wantConns()
1524 }
1525
1526 func (t *Torrent) updateWantPeersEvent() {
1527         if t.wantPeers() {
1528                 t.wantPeersEvent.Set()
1529         } else {
1530                 t.wantPeersEvent.Clear()
1531         }
1532 }
1533
1534 // Returns whether the client should make effort to seed the torrent.
1535 func (t *Torrent) seeding() bool {
1536         cl := t.cl
1537         if t.closed.IsSet() {
1538                 return false
1539         }
1540         if t.dataUploadDisallowed {
1541                 return false
1542         }
1543         if cl.config.NoUpload {
1544                 return false
1545         }
1546         if !cl.config.Seed {
1547                 return false
1548         }
1549         if cl.config.DisableAggressiveUpload && t.needData() {
1550                 return false
1551         }
1552         return true
1553 }
1554
1555 func (t *Torrent) onWebRtcConn(
1556         c datachannel.ReadWriteCloser,
1557         dcc webtorrent.DataChannelContext,
1558 ) {
1559         defer c.Close()
1560         netConn := webrtcNetConn{
1561                 ReadWriteCloser:    c,
1562                 DataChannelContext: dcc,
1563         }
1564         peerRemoteAddr := netConn.RemoteAddr()
1565         if t.cl.badPeerAddr(peerRemoteAddr) {
1566                 return
1567         }
1568         pc, err := t.cl.initiateProtocolHandshakes(
1569                 context.Background(),
1570                 netConn,
1571                 t,
1572                 dcc.LocalOffered,
1573                 false,
1574                 netConn.RemoteAddr(),
1575                 webrtcNetwork,
1576                 fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1577         )
1578         if err != nil {
1579                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1580                 return
1581         }
1582         if dcc.LocalOffered {
1583                 pc.Discovery = PeerSourceTracker
1584         } else {
1585                 pc.Discovery = PeerSourceIncoming
1586         }
1587         pc.conn.SetWriteDeadline(time.Time{})
1588         t.cl.lock()
1589         defer t.cl.unlock()
1590         err = t.cl.runHandshookConn(pc, t)
1591         if err != nil {
1592                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1593         }
1594 }
1595
1596 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1597         err := t.cl.runHandshookConn(pc, t)
1598         if err != nil || logAll {
1599                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1600         }
1601 }
1602
1603 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1604         t.logRunHandshookConn(pc, false, log.Debug)
1605 }
1606
1607 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1608         wtc, release := t.cl.websocketTrackers.Get(u.String())
1609         go func() {
1610                 <-t.closed.Done()
1611                 release()
1612         }()
1613         wst := websocketTrackerStatus{u, wtc}
1614         go func() {
1615                 err := wtc.Announce(tracker.Started, t.infoHash)
1616                 if err != nil {
1617                         t.logger.WithDefaultLevel(log.Warning).Printf(
1618                                 "error in initial announce to %q: %v",
1619                                 u.String(), err,
1620                         )
1621                 }
1622         }()
1623         return wst
1624 }
1625
1626 func (t *Torrent) startScrapingTracker(_url string) {
1627         if _url == "" {
1628                 return
1629         }
1630         u, err := url.Parse(_url)
1631         if err != nil {
1632                 // URLs with a leading '*' appear to be a uTorrent convention to
1633                 // disable trackers.
1634                 if _url[0] != '*' {
1635                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1636                 }
1637                 return
1638         }
1639         if u.Scheme == "udp" {
1640                 u.Scheme = "udp4"
1641                 t.startScrapingTracker(u.String())
1642                 u.Scheme = "udp6"
1643                 t.startScrapingTracker(u.String())
1644                 return
1645         }
1646         if _, ok := t.trackerAnnouncers[_url]; ok {
1647                 return
1648         }
1649         sl := func() torrentTrackerAnnouncer {
1650                 switch u.Scheme {
1651                 case "ws", "wss":
1652                         if t.cl.config.DisableWebtorrent {
1653                                 return nil
1654                         }
1655                         return t.startWebsocketAnnouncer(*u)
1656                 case "udp4":
1657                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1658                                 return nil
1659                         }
1660                 case "udp6":
1661                         if t.cl.config.DisableIPv6 {
1662                                 return nil
1663                         }
1664                 }
1665                 newAnnouncer := &trackerScraper{
1666                         u:               *u,
1667                         t:               t,
1668                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1669                 }
1670                 go newAnnouncer.Run()
1671                 return newAnnouncer
1672         }()
1673         if sl == nil {
1674                 return
1675         }
1676         if t.trackerAnnouncers == nil {
1677                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1678         }
1679         t.trackerAnnouncers[_url] = sl
1680 }
1681
1682 // Adds and starts tracker scrapers for tracker URLs that aren't already
1683 // running.
1684 func (t *Torrent) startMissingTrackerScrapers() {
1685         if t.cl.config.DisableTrackers {
1686                 return
1687         }
1688         t.startScrapingTracker(t.metainfo.Announce)
1689         for _, tier := range t.metainfo.AnnounceList {
1690                 for _, url := range tier {
1691                         t.startScrapingTracker(url)
1692                 }
1693         }
1694 }
1695
1696 // Returns an AnnounceRequest with fields filled out to defaults and current
1697 // values.
1698 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1699         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1700         // dependent on the network in use.
1701         return tracker.AnnounceRequest{
1702                 Event: event,
1703                 NumWant: func() int32 {
1704                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1705                                 return -1
1706                         } else {
1707                                 return 0
1708                         }
1709                 }(),
1710                 Port:     uint16(t.cl.incomingPeerPort()),
1711                 PeerId:   t.cl.peerID,
1712                 InfoHash: t.infoHash,
1713                 Key:      t.cl.announceKey(),
1714
1715                 // The following are vaguely described in BEP 3.
1716
1717                 Left:     t.bytesLeftAnnounce(),
1718                 Uploaded: t.stats.BytesWrittenData.Int64(),
1719                 // There's no mention of wasted or unwanted download in the BEP.
1720                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1721         }
1722 }
1723
1724 // Adds peers revealed in an announce until the announce ends, or we have
1725 // enough peers.
1726 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1727         cl := t.cl
1728         for v := range pvs {
1729                 cl.lock()
1730                 added := 0
1731                 for _, cp := range v.Peers {
1732                         if cp.Port == 0 {
1733                                 // Can't do anything with this.
1734                                 continue
1735                         }
1736                         if t.addPeer(PeerInfo{
1737                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1738                                 Source: PeerSourceDhtGetPeers,
1739                         }) {
1740                                 added++
1741                         }
1742                 }
1743                 cl.unlock()
1744                 // if added != 0 {
1745                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1746                 // }
1747         }
1748 }
1749
1750 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1751 // announce ends. stop will force the announce to end.
1752 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1753         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1754         if err != nil {
1755                 return
1756         }
1757         _done := make(chan struct{})
1758         done = _done
1759         stop = ps.Close
1760         go func() {
1761                 t.consumeDhtAnnouncePeers(ps.Peers())
1762                 close(_done)
1763         }()
1764         return
1765 }
1766
1767 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1768         _, stop, err := t.AnnounceToDht(s)
1769         if err != nil {
1770                 return err
1771         }
1772         select {
1773         case <-t.closed.Done():
1774         case <-time.After(5 * time.Minute):
1775         }
1776         stop()
1777         return nil
1778 }
1779
1780 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1781         cl := t.cl
1782         cl.lock()
1783         defer cl.unlock()
1784         for {
1785                 for {
1786                         if t.closed.IsSet() {
1787                                 return
1788                         }
1789                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1790                         // TODO: We can include the announce_peer step depending on whether we can receive
1791                         // inbound connections. We should probably only announce once every 15 mins too.
1792                         if !t.wantConns() {
1793                                 goto wait
1794                         }
1795                         // TODO: Determine if there's a listener on the port we're announcing.
1796                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1797                                 goto wait
1798                         }
1799                         break
1800                 wait:
1801                         cl.event.Wait()
1802                 }
1803                 func() {
1804                         t.numDHTAnnounces++
1805                         cl.unlock()
1806                         defer cl.lock()
1807                         err := t.timeboxedAnnounceToDht(s)
1808                         if err != nil {
1809                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1810                         }
1811                 }()
1812         }
1813 }
1814
1815 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1816         for _, p := range peers {
1817                 if t.addPeer(p) {
1818                         added++
1819                 }
1820         }
1821         return
1822 }
1823
1824 // The returned TorrentStats may require alignment in memory. See
1825 // https://github.com/anacrolix/torrent/issues/383.
1826 func (t *Torrent) Stats() TorrentStats {
1827         t.cl.rLock()
1828         defer t.cl.rUnlock()
1829         return t.statsLocked()
1830 }
1831
1832 func (t *Torrent) statsLocked() (ret TorrentStats) {
1833         ret.ActivePeers = len(t.conns)
1834         ret.HalfOpenPeers = len(t.halfOpen)
1835         ret.PendingPeers = t.peers.Len()
1836         ret.TotalPeers = t.numTotalPeers()
1837         ret.ConnectedSeeders = 0
1838         for c := range t.conns {
1839                 if all, ok := c.peerHasAllPieces(); all && ok {
1840                         ret.ConnectedSeeders++
1841                 }
1842         }
1843         ret.ConnStats = t.stats.Copy()
1844         ret.PiecesComplete = t.numPiecesCompleted()
1845         return
1846 }
1847
1848 // The total number of peers in the torrent.
1849 func (t *Torrent) numTotalPeers() int {
1850         peers := make(map[string]struct{})
1851         for conn := range t.conns {
1852                 ra := conn.conn.RemoteAddr()
1853                 if ra == nil {
1854                         // It's been closed and doesn't support RemoteAddr.
1855                         continue
1856                 }
1857                 peers[ra.String()] = struct{}{}
1858         }
1859         for addr := range t.halfOpen {
1860                 peers[addr] = struct{}{}
1861         }
1862         t.peers.Each(func(peer PeerInfo) {
1863                 peers[peer.Addr.String()] = struct{}{}
1864         })
1865         return len(peers)
1866 }
1867
1868 // Reconcile bytes transferred before connection was associated with a
1869 // torrent.
1870 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1871         if c._stats != (ConnStats{
1872                 // Handshakes should only increment these fields:
1873                 BytesWritten: c._stats.BytesWritten,
1874                 BytesRead:    c._stats.BytesRead,
1875         }) {
1876                 panic("bad stats")
1877         }
1878         c.postHandshakeStats(func(cs *ConnStats) {
1879                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1880                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1881         })
1882         c.reconciledHandshakeStats = true
1883 }
1884
1885 // Returns true if the connection is added.
1886 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1887         defer func() {
1888                 if err == nil {
1889                         torrent.Add("added connections", 1)
1890                 }
1891         }()
1892         if t.closed.IsSet() {
1893                 return errors.New("torrent closed")
1894         }
1895         for c0 := range t.conns {
1896                 if c.PeerID != c0.PeerID {
1897                         continue
1898                 }
1899                 if !t.cl.config.DropDuplicatePeerIds {
1900                         continue
1901                 }
1902                 if c.hasPreferredNetworkOver(c0) {
1903                         c0.close()
1904                         t.deletePeerConn(c0)
1905                 } else {
1906                         return errors.New("existing connection preferred")
1907                 }
1908         }
1909         if len(t.conns) >= t.maxEstablishedConns {
1910                 c := t.worstBadConn()
1911                 if c == nil {
1912                         return errors.New("don't want conns")
1913                 }
1914                 c.close()
1915                 t.deletePeerConn(c)
1916         }
1917         if len(t.conns) >= t.maxEstablishedConns {
1918                 panic(len(t.conns))
1919         }
1920         t.conns[c] = struct{}{}
1921         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1922                 t.pex.Add(c) // as no further extended handshake expected
1923         }
1924         return nil
1925 }
1926
1927 func (t *Torrent) wantConns() bool {
1928         if !t.networkingEnabled.Bool() {
1929                 return false
1930         }
1931         if t.closed.IsSet() {
1932                 return false
1933         }
1934         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
1935                 return false
1936         }
1937         return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
1938 }
1939
1940 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1941         t.cl.lock()
1942         defer t.cl.unlock()
1943         oldMax = t.maxEstablishedConns
1944         t.maxEstablishedConns = max
1945         wcs := worseConnSlice{
1946                 conns: t.appendConns(nil, func(*PeerConn) bool {
1947                         return true
1948                 }),
1949         }
1950         wcs.initKeys()
1951         heap.Init(&wcs)
1952         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1953                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
1954         }
1955         t.openNewConns()
1956         return oldMax
1957 }
1958
1959 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1960         t.logger.LazyLog(log.Debug, func() log.Msg {
1961                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
1962         })
1963         p := t.piece(piece)
1964         p.numVerifies++
1965         t.cl.event.Broadcast()
1966         if t.closed.IsSet() {
1967                 return
1968         }
1969
1970         // Don't score the first time a piece is hashed, it could be an initial check.
1971         if p.storageCompletionOk {
1972                 if passed {
1973                         pieceHashedCorrect.Add(1)
1974                 } else {
1975                         log.Fmsg(
1976                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1977                         ).AddValues(t, p).LogLevel(
1978
1979                                 log.Debug, t.logger)
1980
1981                         pieceHashedNotCorrect.Add(1)
1982                 }
1983         }
1984
1985         p.marking = true
1986         t.publishPieceChange(piece)
1987         defer func() {
1988                 p.marking = false
1989                 t.publishPieceChange(piece)
1990         }()
1991
1992         if passed {
1993                 if len(p.dirtiers) != 0 {
1994                         // Don't increment stats above connection-level for every involved connection.
1995                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1996                 }
1997                 for c := range p.dirtiers {
1998                         c._stats.incrementPiecesDirtiedGood()
1999                 }
2000                 t.clearPieceTouchers(piece)
2001                 t.cl.unlock()
2002                 err := p.Storage().MarkComplete()
2003                 if err != nil {
2004                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
2005                 }
2006                 t.cl.lock()
2007
2008                 if t.closed.IsSet() {
2009                         return
2010                 }
2011                 t.pendAllChunkSpecs(piece)
2012         } else {
2013                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2014                         // Peers contributed to all the data for this piece hash failure, and the failure was
2015                         // not due to errors in the storage (such as data being dropped in a cache).
2016
2017                         // Increment Torrent and above stats, and then specific connections.
2018                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2019                         for c := range p.dirtiers {
2020                                 // Y u do dis peer?!
2021                                 c.stats().incrementPiecesDirtiedBad()
2022                         }
2023
2024                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2025                         for c := range p.dirtiers {
2026                                 if !c.trusted {
2027                                         bannableTouchers = append(bannableTouchers, c)
2028                                 }
2029                         }
2030                         t.clearPieceTouchers(piece)
2031                         slices.Sort(bannableTouchers, connLessTrusted)
2032
2033                         if t.cl.config.Debug {
2034                                 t.logger.Printf(
2035                                         "bannable conns by trust for piece %d: %v",
2036                                         piece,
2037                                         func() (ret []connectionTrust) {
2038                                                 for _, c := range bannableTouchers {
2039                                                         ret = append(ret, c.trust())
2040                                                 }
2041                                                 return
2042                                         }(),
2043                                 )
2044                         }
2045
2046                         if len(bannableTouchers) >= 1 {
2047                                 c := bannableTouchers[0]
2048                                 if len(bannableTouchers) != 1 {
2049                                         t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2050                                 } else {
2051                                         // Turns out it's still useful to ban peers like this because if there's only a
2052                                         // single peer for a piece, and we never progress that piece to completion, we
2053                                         // will never smart-ban them. Discovered in
2054                                         // https://github.com/anacrolix/torrent/issues/715.
2055                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2056                                         c.ban()
2057                                 }
2058                         }
2059                 }
2060                 t.onIncompletePiece(piece)
2061                 p.Storage().MarkNotComplete()
2062         }
2063         t.updatePieceCompletion(piece)
2064 }
2065
2066 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2067         start := t.pieceRequestIndexOffset(piece)
2068         end := start + t.pieceNumChunks(piece)
2069         for ri := start; ri < end; ri++ {
2070                 t.cancelRequest(ri)
2071         }
2072 }
2073
2074 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2075         t.pendAllChunkSpecs(piece)
2076         t.cancelRequestsForPiece(piece)
2077         t.piece(piece).readerCond.Broadcast()
2078         for conn := range t.conns {
2079                 conn.have(piece)
2080                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
2081         }
2082 }
2083
2084 // Called when a piece is found to be not complete.
2085 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2086         if t.pieceAllDirty(piece) {
2087                 t.pendAllChunkSpecs(piece)
2088         }
2089         if !t.wantPieceIndex(piece) {
2090                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2091                 return
2092         }
2093         // We could drop any connections that we told we have a piece that we
2094         // don't here. But there's a test failure, and it seems clients don't care
2095         // if you request pieces that you already claim to have. Pruning bad
2096         // connections might just remove any connections that aren't treating us
2097         // favourably anyway.
2098
2099         // for c := range t.conns {
2100         //      if c.sentHave(piece) {
2101         //              c.drop()
2102         //      }
2103         // }
2104         t.iterPeers(func(conn *Peer) {
2105                 if conn.peerHasPiece(piece) {
2106                         conn.updateRequests("piece incomplete")
2107                 }
2108         })
2109 }
2110
2111 func (t *Torrent) tryCreateMorePieceHashers() {
2112         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2113         }
2114 }
2115
2116 func (t *Torrent) tryCreatePieceHasher() bool {
2117         if t.storage == nil {
2118                 return false
2119         }
2120         pi, ok := t.getPieceToHash()
2121         if !ok {
2122                 return false
2123         }
2124         p := t.piece(pi)
2125         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2126         p.hashing = true
2127         t.publishPieceChange(pi)
2128         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2129         t.storageLock.RLock()
2130         t.activePieceHashes++
2131         go t.pieceHasher(pi)
2132         return true
2133 }
2134
2135 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2136         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2137                 if t.piece(i).hashing {
2138                         return true
2139                 }
2140                 ret = i
2141                 ok = true
2142                 return false
2143         })
2144         return
2145 }
2146
2147 func (t *Torrent) dropBannedPeers() {
2148         t.iterPeers(func(p *Peer) {
2149                 remoteIp := p.remoteIp()
2150                 if remoteIp == nil {
2151                         if p.bannableAddr.Ok() {
2152                                 t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
2153                         }
2154                         return
2155                 }
2156                 netipAddr := netip.MustParseAddr(remoteIp.String())
2157                 if Some(netipAddr) != p.bannableAddr {
2158                         t.logger.WithDefaultLevel(log.Debug).Printf(
2159                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2160                                 p, remoteIp, p.bannableAddr)
2161                 }
2162                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2163                         // Should this be a close?
2164                         p.drop()
2165                         t.logger.WithDefaultLevel(log.Debug).Printf("dropped %v for banned remote IP %v", p, netipAddr)
2166                 }
2167         })
2168 }
2169
2170 func (t *Torrent) pieceHasher(index pieceIndex) {
2171         p := t.piece(index)
2172         sum, failedPeers, copyErr := t.hashPiece(index)
2173         correct := sum == *p.hash
2174         switch copyErr {
2175         case nil, io.EOF:
2176         default:
2177                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2178         }
2179         t.storageLock.RUnlock()
2180         t.cl.lock()
2181         defer t.cl.unlock()
2182         if correct {
2183                 for peer := range failedPeers {
2184                         t.cl.banPeerIP(peer.AsSlice())
2185                         t.logger.WithDefaultLevel(log.Debug).Printf("smart banned %v for piece %v", peer, index)
2186                 }
2187                 t.dropBannedPeers()
2188                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2189                         t.smartBanCache.ForgetBlock(ri)
2190                 }
2191         }
2192         p.hashing = false
2193         t.pieceHashed(index, correct, copyErr)
2194         t.updatePiecePriority(index, "Torrent.pieceHasher")
2195         t.activePieceHashes--
2196         t.tryCreateMorePieceHashers()
2197 }
2198
2199 // Return the connections that touched a piece, and clear the entries while doing it.
2200 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2201         p := t.piece(pi)
2202         for c := range p.dirtiers {
2203                 delete(c.peerTouchedPieces, pi)
2204                 delete(p.dirtiers, c)
2205         }
2206 }
2207
2208 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2209         t.iterPeers(func(p *Peer) {
2210                 ret = append(ret, p)
2211         })
2212         return
2213 }
2214
2215 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2216         piece := t.piece(pieceIndex)
2217         if piece.queuedForHash() {
2218                 return
2219         }
2220         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2221         t.publishPieceChange(pieceIndex)
2222         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2223         t.tryCreateMorePieceHashers()
2224 }
2225
2226 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2227 // before the Info is available.
2228 func (t *Torrent) VerifyData() {
2229         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2230                 t.Piece(i).VerifyData()
2231         }
2232 }
2233
2234 // Start the process of connecting to the given peer for the given torrent if appropriate.
2235 func (t *Torrent) initiateConn(peer PeerInfo) {
2236         if peer.Id == t.cl.peerID {
2237                 return
2238         }
2239         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2240                 return
2241         }
2242         addr := peer.Addr
2243         if t.addrActive(addr.String()) {
2244                 return
2245         }
2246         t.cl.numHalfOpen++
2247         t.halfOpen[addr.String()] = peer
2248         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2249 }
2250
2251 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2252 // quickly make one Client visible to the Torrent of another Client.
2253 func (t *Torrent) AddClientPeer(cl *Client) int {
2254         return t.AddPeers(func() (ps []PeerInfo) {
2255                 for _, la := range cl.ListenAddrs() {
2256                         ps = append(ps, PeerInfo{
2257                                 Addr:    la,
2258                                 Trusted: true,
2259                         })
2260                 }
2261                 return
2262         }())
2263 }
2264
2265 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2266 // connection.
2267 func (t *Torrent) allStats(f func(*ConnStats)) {
2268         f(&t.stats)
2269         f(&t.cl.stats)
2270 }
2271
2272 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2273         return t.pieces[i].hashing
2274 }
2275
2276 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2277         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2278 }
2279
2280 func (t *Torrent) dialTimeout() time.Duration {
2281         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2282 }
2283
2284 func (t *Torrent) piece(i int) *Piece {
2285         return &t.pieces[i]
2286 }
2287
2288 func (t *Torrent) onWriteChunkErr(err error) {
2289         if t.userOnWriteChunkErr != nil {
2290                 go t.userOnWriteChunkErr(err)
2291                 return
2292         }
2293         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2294         t.disallowDataDownloadLocked()
2295 }
2296
2297 func (t *Torrent) DisallowDataDownload() {
2298         t.disallowDataDownloadLocked()
2299 }
2300
2301 func (t *Torrent) disallowDataDownloadLocked() {
2302         t.dataDownloadDisallowed.Set()
2303 }
2304
2305 func (t *Torrent) AllowDataDownload() {
2306         t.dataDownloadDisallowed.Clear()
2307 }
2308
2309 // Enables uploading data, if it was disabled.
2310 func (t *Torrent) AllowDataUpload() {
2311         t.cl.lock()
2312         defer t.cl.unlock()
2313         t.dataUploadDisallowed = false
2314         for c := range t.conns {
2315                 c.updateRequests("allow data upload")
2316         }
2317 }
2318
2319 // Disables uploading data, if it was enabled.
2320 func (t *Torrent) DisallowDataUpload() {
2321         t.cl.lock()
2322         defer t.cl.unlock()
2323         t.dataUploadDisallowed = true
2324         for c := range t.conns {
2325                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2326                 c.updateRequests("disallow data upload")
2327         }
2328 }
2329
2330 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2331 // or if nil, a critical message is logged, and data download is disabled.
2332 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2333         t.cl.lock()
2334         defer t.cl.unlock()
2335         t.userOnWriteChunkErr = f
2336 }
2337
2338 func (t *Torrent) iterPeers(f func(p *Peer)) {
2339         for pc := range t.conns {
2340                 f(&pc.Peer)
2341         }
2342         for _, ws := range t.webSeeds {
2343                 f(ws)
2344         }
2345 }
2346
2347 func (t *Torrent) callbacks() *Callbacks {
2348         return &t.cl.config.Callbacks
2349 }
2350
2351 type AddWebSeedsOpt func(*webseed.Client)
2352
2353 // Sets the WebSeed trailing path escaper for a webseed.Client.
2354 func WebSeedPathEscaper(custom webseed.PathEscaper) AddWebSeedsOpt {
2355         return func(c *webseed.Client) {
2356                 c.PathEscaper = custom
2357         }
2358 }
2359
2360 func (t *Torrent) AddWebSeeds(urls []string, opts ...AddWebSeedsOpt) {
2361         t.cl.lock()
2362         defer t.cl.unlock()
2363         for _, u := range urls {
2364                 t.addWebSeed(u, opts...)
2365         }
2366 }
2367
2368 func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
2369         if t.cl.config.DisableWebseeds {
2370                 return
2371         }
2372         if _, ok := t.webSeeds[url]; ok {
2373                 return
2374         }
2375         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2376         // right away. This value should be some multiple of the number of connections to a host. I
2377         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2378         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2379         // "https://webtorrent.io/torrents/".
2380         const maxRequests = 16
2381         ws := webseedPeer{
2382                 peer: Peer{
2383                         t:                        t,
2384                         outgoing:                 true,
2385                         Network:                  "http",
2386                         reconciledHandshakeStats: true,
2387                         // This should affect how often we have to recompute requests for this peer. Note that
2388                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2389                         // requests mark more often, so recomputation is probably sooner than with regular peer
2390                         // conns. ~4x maxRequests would be about right.
2391                         PeerMaxRequests: 128,
2392                         // TODO: Set ban prefix?
2393                         RemoteAddr: remoteAddrFromUrl(url),
2394                         callbacks:  t.callbacks(),
2395                 },
2396                 client: webseed.Client{
2397                         HttpClient: t.cl.httpClient,
2398                         Url:        url,
2399                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2400                                 return &rateLimitedReader{
2401                                         l: t.cl.config.DownloadRateLimiter,
2402                                         r: r,
2403                                 }
2404                         },
2405                 },
2406                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2407                 maxRequests:    maxRequests,
2408         }
2409         ws.peer.initRequestState()
2410         for _, opt := range opts {
2411                 opt(&ws.client)
2412         }
2413         ws.peer.initUpdateRequestsTimer()
2414         ws.requesterCond.L = t.cl.locker()
2415         for i := 0; i < maxRequests; i += 1 {
2416                 go ws.requester(i)
2417         }
2418         for _, f := range t.callbacks().NewPeer {
2419                 f(&ws.peer)
2420         }
2421         ws.peer.logger = t.logger.WithContextValue(&ws)
2422         ws.peer.peerImpl = &ws
2423         if t.haveInfo() {
2424                 ws.onGotInfo(t.info)
2425         }
2426         t.webSeeds[url] = &ws.peer
2427 }
2428
2429 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2430         t.iterPeers(func(p1 *Peer) {
2431                 if p1 == p {
2432                         active = true
2433                 }
2434         })
2435         return
2436 }
2437
2438 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2439         index := t.pieceIndexOfRequestIndex(ri)
2440         return Request{
2441                 pp.Integer(index),
2442                 t.piece(index).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2443         }
2444 }
2445
2446 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2447         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + RequestIndex(r.Begin/t.chunkSize)
2448 }
2449
2450 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2451         return RequestIndex(piece) * t.chunksPerRegularPiece()
2452 }
2453
2454 func (t *Torrent) updateComplete() {
2455         t.Complete.SetBool(t.haveAllPieces())
2456 }
2457
2458 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2459         p := t.requestingPeer(r)
2460         if p != nil {
2461                 p.cancel(r)
2462         }
2463         // TODO: This is a check that an old invariant holds. It can be removed after some testing.
2464         //delete(t.pendingRequests, r)
2465         var zeroRequestState requestState
2466         if t.requestState[r] != zeroRequestState {
2467                 panic("expected request state to be gone")
2468         }
2469         return p
2470 }
2471
2472 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2473         return t.requestState[r].peer
2474 }
2475
2476 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2477         if t.connsWithAllPieces == nil {
2478                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2479         }
2480         t.connsWithAllPieces[p] = struct{}{}
2481 }
2482
2483 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2484         _, ok := t.connsWithAllPieces[p]
2485         delete(t.connsWithAllPieces, p)
2486         return ok
2487 }
2488
2489 func (t *Torrent) numActivePeers() int {
2490         return len(t.conns) + len(t.webSeeds)
2491 }
2492
2493 func (t *Torrent) hasStorageCap() bool {
2494         f := t.storage.Capacity
2495         if f == nil {
2496                 return false
2497         }
2498         _, ok := (*f)()
2499         return ok
2500 }
2501
2502 func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
2503         return pieceIndex(ri / t.chunksPerRegularPiece())
2504 }
2505
2506 type requestState struct {
2507         peer *Peer
2508         when time.Time
2509 }