]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Merge branch 'master' into crawshaw-386-wasm
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "net/netip"
12         "net/url"
13         "sort"
14         "strings"
15         "text/tabwriter"
16         "time"
17         "unsafe"
18
19         "github.com/RoaringBitmap/roaring"
20         "github.com/anacrolix/chansync"
21         "github.com/anacrolix/chansync/events"
22         "github.com/anacrolix/dht/v2"
23         . "github.com/anacrolix/generics"
24         "github.com/anacrolix/log"
25         "github.com/anacrolix/missinggo/perf"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/bitmap"
29         "github.com/anacrolix/missinggo/v2/pubsub"
30         "github.com/anacrolix/multiless"
31         "github.com/anacrolix/sync"
32         request_strategy "github.com/anacrolix/torrent/request-strategy"
33         "github.com/davecgh/go-spew/spew"
34         "github.com/pion/datachannel"
35
36         "github.com/anacrolix/torrent/bencode"
37         "github.com/anacrolix/torrent/common"
38         "github.com/anacrolix/torrent/metainfo"
39         pp "github.com/anacrolix/torrent/peer_protocol"
40         "github.com/anacrolix/torrent/segments"
41         "github.com/anacrolix/torrent/storage"
42         "github.com/anacrolix/torrent/tracker"
43         "github.com/anacrolix/torrent/webseed"
44         "github.com/anacrolix/torrent/webtorrent"
45 )
46
47 // Maintains state of torrent within a Client. Many methods should not be called before the info is
48 // available, see .Info and .GotInfo.
49 type Torrent struct {
50         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
51         // alignment. See #262.
52         stats  ConnStats
53         cl     *Client
54         logger log.Logger
55
56         networkingEnabled      chansync.Flag
57         dataDownloadDisallowed chansync.Flag
58         dataUploadDisallowed   bool
59         userOnWriteChunkErr    func(error)
60
61         closed   chansync.SetOnce
62         infoHash metainfo.Hash
63         pieces   []Piece
64         // Values are the piece indices that changed.
65         pieceStateChanges pubsub.PubSub[PieceStateChange]
66         // The size of chunks to request from peers over the wire. This is
67         // normally 16KiB by convention these days.
68         chunkSize pp.Integer
69         chunkPool sync.Pool
70         // Total length of the torrent in bytes. Stored because it's not O(1) to
71         // get this from the info dict.
72         length *int64
73
74         // The storage to open when the info dict becomes available.
75         storageOpener *storage.Client
76         // Storage for torrent data.
77         storage *storage.Torrent
78         // Read-locked for using storage, and write-locked for Closing.
79         storageLock sync.RWMutex
80
81         // TODO: Only announce stuff is used?
82         metainfo metainfo.MetaInfo
83
84         // The info dict. nil if we don't have it (yet).
85         info      *metainfo.Info
86         fileIndex segments.Index
87         files     *[]*File
88
89         webSeeds map[string]*Peer
90         // Active peer connections, running message stream loops. TODO: Make this
91         // open (not-closed) connections only.
92         conns               map[*PeerConn]struct{}
93         maxEstablishedConns int
94         // Set of addrs to which we're attempting to connect. Connections are
95         // half-open until all handshakes are completed.
96         halfOpen map[string]PeerInfo
97
98         // Reserve of peers to connect to. A peer can be both here and in the
99         // active connections if were told about the peer after connecting with
100         // them. That encourages us to reconnect to peers that are well known in
101         // the swarm.
102         peers prioritizedPeers
103         // Whether we want to know to know more peers.
104         wantPeersEvent missinggo.Event
105         // An announcer for each tracker URL.
106         trackerAnnouncers map[string]torrentTrackerAnnouncer
107         // How many times we've initiated a DHT announce. TODO: Move into stats.
108         numDHTAnnounces int
109
110         // Name used if the info name isn't available. Should be cleared when the
111         // Info does become available.
112         nameMu      sync.RWMutex
113         displayName string
114
115         // The bencoded bytes of the info dict. This is actively manipulated if
116         // the info bytes aren't initially available, and we try to fetch them
117         // from peers.
118         metadataBytes []byte
119         // Each element corresponds to the 16KiB metadata pieces. If true, we have
120         // received that piece.
121         metadataCompletedChunks []bool
122         metadataChanged         sync.Cond
123
124         // Closed when .Info is obtained.
125         gotMetainfoC chan struct{}
126
127         readers                map[*reader]struct{}
128         _readerNowPieces       bitmap.Bitmap
129         _readerReadaheadPieces bitmap.Bitmap
130
131         // A cache of pieces we need to get. Calculated from various piece and
132         // file priorities and completion states elsewhere.
133         _pendingPieces roaring.Bitmap
134         // A cache of completed piece indices.
135         _completedPieces roaring.Bitmap
136         // Pieces that need to be hashed.
137         piecesQueuedForHash       bitmap.Bitmap
138         activePieceHashes         int
139         initialPieceCheckDisabled bool
140
141         connsWithAllPieces map[*Peer]struct{}
142         // Count of each request across active connections.
143         pendingRequests map[RequestIndex]*Peer
144         lastRequested   map[RequestIndex]time.Time
145         // Chunks we've written to since the corresponding piece was last checked.
146         dirtyChunks roaring.Bitmap
147
148         pex pexState
149
150         // Is On when all pieces are complete.
151         Complete chansync.Flag
152
153         // Torrent sources in use keyed by the source string.
154         activeSources sync.Map
155         sourcesLogger log.Logger
156         
157         smartBanCache smartBanCache
158 }
159
160 func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
161         // This could be done with roaring.BitSliceIndexing.
162         t.iterPeers(func(peer *Peer) {
163                 if _, ok := t.connsWithAllPieces[peer]; ok {
164                         return
165                 }
166                 if peer.peerHasPiece(i) {
167                         count++
168                 }
169         })
170         return
171 }
172
173 func (t *Torrent) decPieceAvailability(i pieceIndex) {
174         if !t.haveInfo() {
175                 return
176         }
177         p := t.piece(i)
178         if p.relativeAvailability <= 0 {
179                 panic(p.relativeAvailability)
180         }
181         p.relativeAvailability--
182         t.updatePieceRequestOrder(i)
183 }
184
185 func (t *Torrent) incPieceAvailability(i pieceIndex) {
186         // If we don't the info, this should be reconciled when we do.
187         if t.haveInfo() {
188                 p := t.piece(i)
189                 p.relativeAvailability++
190                 t.updatePieceRequestOrder(i)
191         }
192 }
193
194 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
195         return t._readerNowPieces
196 }
197
198 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
199         return t._readerReadaheadPieces
200 }
201
202 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
203         return !t.wantPieceIndex(i)
204 }
205
206 // Returns a channel that is closed when the Torrent is closed.
207 func (t *Torrent) Closed() events.Done {
208         return t.closed.Done()
209 }
210
211 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
212 // pending, and half-open peers.
213 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
214         // Add pending peers to the list
215         t.peers.Each(func(peer PeerInfo) {
216                 ks = append(ks, peer)
217         })
218
219         // Add half-open peers to the list
220         for _, peer := range t.halfOpen {
221                 ks = append(ks, peer)
222         }
223
224         // Add active peers to the list
225         for conn := range t.conns {
226                 ks = append(ks, PeerInfo{
227                         Id:     conn.PeerID,
228                         Addr:   conn.RemoteAddr,
229                         Source: conn.Discovery,
230                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
231                         // > But if we're not connected to them with an encrypted connection, I couldn't say
232                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
233                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
234                         // > connection. It's probably easiest to do the latter for now.
235                         // https://github.com/anacrolix/torrent/pull/188
236                         SupportsEncryption: conn.headerEncrypted,
237                 })
238         }
239
240         return
241 }
242
243 func (t *Torrent) setChunkSize(size pp.Integer) {
244         t.chunkSize = size
245         t.chunkPool = sync.Pool{
246                 New: func() interface{} {
247                         b := make([]byte, size)
248                         return &b
249                 },
250         }
251 }
252
253 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
254         return t._completedPieces.Contains(bitmap.BitIndex(piece))
255 }
256
257 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
258         if t.storage == nil {
259                 return storage.Completion{Complete: false, Ok: true}
260         }
261         return t.pieces[piece].Storage().Completion()
262 }
263
264 // There's a connection to that address already.
265 func (t *Torrent) addrActive(addr string) bool {
266         if _, ok := t.halfOpen[addr]; ok {
267                 return true
268         }
269         for c := range t.conns {
270                 ra := c.RemoteAddr
271                 if ra.String() == addr {
272                         return true
273                 }
274         }
275         return false
276 }
277
278 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
279         return t.appendConns(ret, func(conn *PeerConn) bool {
280                 return !conn.closed.IsSet()
281         })
282 }
283
284 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
285         for c := range t.conns {
286                 if f(c) {
287                         ret = append(ret, c)
288                 }
289         }
290         return ret
291 }
292
293 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
294         cl := t.cl
295         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
296         if t.closed.IsSet() {
297                 return false
298         }
299         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
300                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
301                         torrent.Add("peers not added because of bad addr", 1)
302                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
303                         return false
304                 }
305         }
306         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
307                 torrent.Add("peers replaced", 1)
308                 if !replaced.equal(p) {
309                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
310                         added = true
311                 }
312         } else {
313                 added = true
314         }
315         t.openNewConns()
316         for t.peers.Len() > cl.config.TorrentPeersHighWater {
317                 _, ok := t.peers.DeleteMin()
318                 if ok {
319                         torrent.Add("excess reserve peers discarded", 1)
320                 }
321         }
322         return
323 }
324
325 func (t *Torrent) invalidateMetadata() {
326         for i := 0; i < len(t.metadataCompletedChunks); i++ {
327                 t.metadataCompletedChunks[i] = false
328         }
329         t.nameMu.Lock()
330         t.gotMetainfoC = make(chan struct{})
331         t.info = nil
332         t.nameMu.Unlock()
333 }
334
335 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
336         if t.haveInfo() {
337                 return
338         }
339         if index >= len(t.metadataCompletedChunks) {
340                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
341                 return
342         }
343         copy(t.metadataBytes[(1<<14)*index:], data)
344         t.metadataCompletedChunks[index] = true
345 }
346
347 func (t *Torrent) metadataPieceCount() int {
348         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
349 }
350
351 func (t *Torrent) haveMetadataPiece(piece int) bool {
352         if t.haveInfo() {
353                 return (1<<14)*piece < len(t.metadataBytes)
354         } else {
355                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
356         }
357 }
358
359 func (t *Torrent) metadataSize() int {
360         return len(t.metadataBytes)
361 }
362
363 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
364         for i := 0; i < len(info.Pieces); i += sha1.Size {
365                 ret = append(ret, info.Pieces[i:i+sha1.Size])
366         }
367         return
368 }
369
370 func (t *Torrent) makePieces() {
371         hashes := infoPieceHashes(t.info)
372         t.pieces = make([]Piece, len(hashes))
373         for i, hash := range hashes {
374                 piece := &t.pieces[i]
375                 piece.t = t
376                 piece.index = pieceIndex(i)
377                 piece.noPendingWrites.L = &piece.pendingWritesMutex
378                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
379                 files := *t.files
380                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
381                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
382                 piece.files = files[beginFile:endFile]
383                 piece.undirtiedChunksIter = undirtiedChunksIter{
384                         TorrentDirtyChunks: &t.dirtyChunks,
385                         StartRequestIndex:  piece.requestIndexOffset(),
386                         EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
387                 }
388         }
389 }
390
391 // Returns the index of the first file containing the piece. files must be
392 // ordered by offset.
393 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
394         for i, f := range files {
395                 if f.offset+f.length > pieceOffset {
396                         return i
397                 }
398         }
399         return 0
400 }
401
402 // Returns the index after the last file containing the piece. files must be
403 // ordered by offset.
404 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
405         for i, f := range files {
406                 if f.offset+f.length >= pieceEndOffset {
407                         return i + 1
408                 }
409         }
410         return 0
411 }
412
413 func (t *Torrent) cacheLength() {
414         var l int64
415         for _, f := range t.info.UpvertedFiles() {
416                 l += f.Length
417         }
418         t.length = &l
419 }
420
421 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
422 // separately.
423 func (t *Torrent) setInfo(info *metainfo.Info) error {
424         if err := validateInfo(info); err != nil {
425                 return fmt.Errorf("bad info: %s", err)
426         }
427         if t.storageOpener != nil {
428                 var err error
429                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
430                 if err != nil {
431                         return fmt.Errorf("error opening torrent storage: %s", err)
432                 }
433         }
434         t.nameMu.Lock()
435         t.info = info
436         t.nameMu.Unlock()
437         t.updateComplete()
438         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
439         t.displayName = "" // Save a few bytes lol.
440         t.initFiles()
441         t.cacheLength()
442         t.makePieces()
443         return nil
444 }
445
446 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
447         return request_strategy.PieceRequestOrderKey{
448                 InfoHash: t.infoHash,
449                 Index:    i,
450         }
451 }
452
453 // This seems to be all the follow-up tasks after info is set, that can't fail.
454 func (t *Torrent) onSetInfo() {
455         t.initPieceRequestOrder()
456         for i := range t.pieces {
457                 p := &t.pieces[i]
458                 // Need to add relativeAvailability before updating piece completion, as that may result in conns
459                 // being dropped.
460                 if p.relativeAvailability != 0 {
461                         panic(p.relativeAvailability)
462                 }
463                 p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
464                 t.addRequestOrderPiece(i)
465                 t.updatePieceCompletion(pieceIndex(i))
466                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
467                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
468                         t.queuePieceCheck(pieceIndex(i))
469                 }
470         }
471         t.cl.event.Broadcast()
472         close(t.gotMetainfoC)
473         t.updateWantPeersEvent()
474         t.pendingRequests = make(map[RequestIndex]*Peer)
475         t.lastRequested = make(map[RequestIndex]time.Time)
476         t.tryCreateMorePieceHashers()
477         t.iterPeers(func(p *Peer) {
478                 p.onGotInfo(t.info)
479                 p.updateRequests("onSetInfo")
480         })
481 }
482
483 // Called when metadata for a torrent becomes available.
484 func (t *Torrent) setInfoBytesLocked(b []byte) error {
485         if metainfo.HashBytes(b) != t.infoHash {
486                 return errors.New("info bytes have wrong hash")
487         }
488         var info metainfo.Info
489         if err := bencode.Unmarshal(b, &info); err != nil {
490                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
491         }
492         t.metadataBytes = b
493         t.metadataCompletedChunks = nil
494         if t.info != nil {
495                 return nil
496         }
497         if err := t.setInfo(&info); err != nil {
498                 return err
499         }
500         t.onSetInfo()
501         return nil
502 }
503
504 func (t *Torrent) haveAllMetadataPieces() bool {
505         if t.haveInfo() {
506                 return true
507         }
508         if t.metadataCompletedChunks == nil {
509                 return false
510         }
511         for _, have := range t.metadataCompletedChunks {
512                 if !have {
513                         return false
514                 }
515         }
516         return true
517 }
518
519 // TODO: Propagate errors to disconnect peer.
520 func (t *Torrent) setMetadataSize(size int) (err error) {
521         if t.haveInfo() {
522                 // We already know the correct metadata size.
523                 return
524         }
525         if uint32(size) > maxMetadataSize {
526                 return errors.New("bad size")
527         }
528         if len(t.metadataBytes) == size {
529                 return
530         }
531         t.metadataBytes = make([]byte, size)
532         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
533         t.metadataChanged.Broadcast()
534         for c := range t.conns {
535                 c.requestPendingMetadata()
536         }
537         return
538 }
539
540 // The current working name for the torrent. Either the name in the info dict,
541 // or a display name given such as by the dn value in a magnet link, or "".
542 func (t *Torrent) name() string {
543         t.nameMu.RLock()
544         defer t.nameMu.RUnlock()
545         if t.haveInfo() {
546                 return t.info.Name
547         }
548         if t.displayName != "" {
549                 return t.displayName
550         }
551         return "infohash:" + t.infoHash.HexString()
552 }
553
554 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
555         p := &t.pieces[index]
556         ret.Priority = t.piecePriority(index)
557         ret.Completion = p.completion()
558         ret.QueuedForHash = p.queuedForHash()
559         ret.Hashing = p.hashing
560         ret.Checking = ret.QueuedForHash || ret.Hashing
561         ret.Marking = p.marking
562         if !ret.Complete && t.piecePartiallyDownloaded(index) {
563                 ret.Partial = true
564         }
565         return
566 }
567
568 func (t *Torrent) metadataPieceSize(piece int) int {
569         return metadataPieceSize(len(t.metadataBytes), piece)
570 }
571
572 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
573         return pp.Message{
574                 Type:       pp.Extended,
575                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
576                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
577                         Piece:     piece,
578                         TotalSize: len(t.metadataBytes),
579                         Type:      msgType,
580                 }), data...),
581         }
582 }
583
584 type pieceAvailabilityRun struct {
585         Count        pieceIndex
586         Availability int
587 }
588
589 func (me pieceAvailabilityRun) String() string {
590         return fmt.Sprintf("%v(%v)", me.Count, me.Availability)
591 }
592
593 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
594         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
595                 ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
596         })
597         for i := range t.pieces {
598                 rle.Append(t.pieces[i].availability(), 1)
599         }
600         rle.Flush()
601         return
602 }
603
604 func (t *Torrent) pieceAvailabilityFrequencies() (freqs []int) {
605         freqs = make([]int, t.numActivePeers()+1)
606         for i := range t.pieces {
607                 freqs[t.piece(i).availability()]++
608         }
609         return
610 }
611
612 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
613         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
614                 ret = append(ret, PieceStateRun{
615                         PieceState: el.(PieceState),
616                         Length:     int(count),
617                 })
618         })
619         for index := range t.pieces {
620                 rle.Append(t.pieceState(pieceIndex(index)), 1)
621         }
622         rle.Flush()
623         return
624 }
625
626 // Produces a small string representing a PieceStateRun.
627 func (psr PieceStateRun) String() (ret string) {
628         ret = fmt.Sprintf("%d", psr.Length)
629         ret += func() string {
630                 switch psr.Priority {
631                 case PiecePriorityNext:
632                         return "N"
633                 case PiecePriorityNormal:
634                         return "."
635                 case PiecePriorityReadahead:
636                         return "R"
637                 case PiecePriorityNow:
638                         return "!"
639                 case PiecePriorityHigh:
640                         return "H"
641                 default:
642                         return ""
643                 }
644         }()
645         if psr.Hashing {
646                 ret += "H"
647         }
648         if psr.QueuedForHash {
649                 ret += "Q"
650         }
651         if psr.Marking {
652                 ret += "M"
653         }
654         if psr.Partial {
655                 ret += "P"
656         }
657         if psr.Complete {
658                 ret += "C"
659         }
660         if !psr.Ok {
661                 ret += "?"
662         }
663         return
664 }
665
666 func (t *Torrent) writeStatus(w io.Writer) {
667         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
668         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
669         if !t.haveInfo() {
670                 fmt.Fprintf(w, "Metadata have: ")
671                 for _, h := range t.metadataCompletedChunks {
672                         fmt.Fprintf(w, "%c", func() rune {
673                                 if h {
674                                         return 'H'
675                                 } else {
676                                         return '.'
677                                 }
678                         }())
679                 }
680                 fmt.Fprintln(w)
681         }
682         fmt.Fprintf(w, "Piece length: %s\n",
683                 func() string {
684                         if t.haveInfo() {
685                                 return fmt.Sprintf("%v (%v chunks)",
686                                         t.usualPieceSize(),
687                                         float64(t.usualPieceSize())/float64(t.chunkSize))
688                         } else {
689                                 return "no info"
690                         }
691                 }(),
692         )
693         if t.info != nil {
694                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
695                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
696                 // Generates a huge, unhelpful listing when piece availability is very scattered. Prefer
697                 // availability frequencies instead.
698                 if false {
699                         fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
700                                 for _, run := range t.pieceAvailabilityRuns() {
701                                         ret = append(ret, run.String())
702                                 }
703                                 return
704                         }(), " "))
705                 }
706                 fmt.Fprintf(w, "Piece availability frequency: %v\n", strings.Join(
707                         func() (ret []string) {
708                                 for avail, freq := range t.pieceAvailabilityFrequencies() {
709                                         if freq == 0 {
710                                                 continue
711                                         }
712                                         ret = append(ret, fmt.Sprintf("%v: %v", avail, freq))
713                                 }
714                                 return
715                         }(),
716                         ", "))
717         }
718         fmt.Fprintf(w, "Reader Pieces:")
719         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
720                 fmt.Fprintf(w, " %d:%d", begin, end)
721                 return true
722         })
723         fmt.Fprintln(w)
724
725         fmt.Fprintf(w, "Enabled trackers:\n")
726         func() {
727                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
728                 fmt.Fprintf(tw, "    URL\tExtra\n")
729                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
730                         lu := l.URL()
731                         ru := r.URL()
732                         var luns, runs url.URL = *lu, *ru
733                         luns.Scheme = ""
734                         runs.Scheme = ""
735                         var ml missinggo.MultiLess
736                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
737                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
738                         return ml.Less()
739                 }).([]torrentTrackerAnnouncer) {
740                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
741                 }
742                 tw.Flush()
743         }()
744
745         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
746
747         spew.NewDefaultConfig()
748         spew.Fdump(w, t.statsLocked())
749
750         peers := t.peersAsSlice()
751         sort.Slice(peers, func(_i, _j int) bool {
752                 i := peers[_i]
753                 j := peers[_j]
754                 if less, ok := multiless.New().EagerSameLess(
755                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
756                 ).LessOk(); ok {
757                         return less
758                 }
759                 return worseConn(i, j)
760         })
761         for i, c := range peers {
762                 fmt.Fprintf(w, "%2d. ", i+1)
763                 c.writeStatus(w, t)
764         }
765 }
766
767 func (t *Torrent) haveInfo() bool {
768         return t.info != nil
769 }
770
771 // Returns a run-time generated MetaInfo that includes the info bytes and
772 // announce-list as currently known to the client.
773 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
774         return metainfo.MetaInfo{
775                 CreationDate: time.Now().Unix(),
776                 Comment:      "dynamic metainfo from client",
777                 CreatedBy:    "go.torrent",
778                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
779                 InfoBytes: func() []byte {
780                         if t.haveInfo() {
781                                 return t.metadataBytes
782                         } else {
783                                 return nil
784                         }
785                 }(),
786                 UrlList: func() []string {
787                         ret := make([]string, 0, len(t.webSeeds))
788                         for url := range t.webSeeds {
789                                 ret = append(ret, url)
790                         }
791                         return ret
792                 }(),
793         }
794 }
795
796 // Get bytes left
797 func (t *Torrent) BytesMissing() (n int64) {
798         t.cl.rLock()
799         n = t.bytesMissingLocked()
800         t.cl.rUnlock()
801         return
802 }
803
804 func (t *Torrent) bytesMissingLocked() int64 {
805         return t.bytesLeft()
806 }
807
808 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
809         roaring.Flip(b, 0, end).Iterate(cb)
810 }
811
812 func (t *Torrent) bytesLeft() (left int64) {
813         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
814                 p := t.piece(pieceIndex(x))
815                 left += int64(p.length() - p.numDirtyBytes())
816                 return true
817         })
818         return
819 }
820
821 // Bytes left to give in tracker announces.
822 func (t *Torrent) bytesLeftAnnounce() int64 {
823         if t.haveInfo() {
824                 return t.bytesLeft()
825         } else {
826                 return -1
827         }
828 }
829
830 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
831         if t.pieceComplete(piece) {
832                 return false
833         }
834         if t.pieceAllDirty(piece) {
835                 return false
836         }
837         return t.pieces[piece].hasDirtyChunks()
838 }
839
840 func (t *Torrent) usualPieceSize() int {
841         return int(t.info.PieceLength)
842 }
843
844 func (t *Torrent) numPieces() pieceIndex {
845         return pieceIndex(t.info.NumPieces())
846 }
847
848 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
849         return pieceIndex(t._completedPieces.GetCardinality())
850 }
851
852 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
853         t.closed.Set()
854         if t.storage != nil {
855                 wg.Add(1)
856                 go func() {
857                         defer wg.Done()
858                         t.storageLock.Lock()
859                         defer t.storageLock.Unlock()
860                         if f := t.storage.Close; f != nil {
861                                 err1 := f()
862                                 if err1 != nil {
863                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
864                                 }
865                         }
866                 }()
867         }
868         t.iterPeers(func(p *Peer) {
869                 p.close()
870         })
871         if t.storage != nil {
872                 t.deletePieceRequestOrder()
873         }
874         for i := range t.pieces {
875                 p := t.piece(i)
876                 if p.relativeAvailability != 0 {
877                         panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
878                 }
879         }
880         t.pex.Reset()
881         t.cl.event.Broadcast()
882         t.pieceStateChanges.Close()
883         t.updateWantPeersEvent()
884         return
885 }
886
887 func (t *Torrent) requestOffset(r Request) int64 {
888         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
889 }
890
891 // Return the request that would include the given offset into the torrent data. Returns !ok if
892 // there is no such request.
893 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
894         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
895 }
896
897 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
898         defer perf.ScopeTimerErr(&err)()
899         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
900         if err == nil && n != len(data) {
901                 err = io.ErrShortWrite
902         }
903         return err
904 }
905
906 func (t *Torrent) bitfield() (bf []bool) {
907         bf = make([]bool, t.numPieces())
908         t._completedPieces.Iterate(func(piece uint32) (again bool) {
909                 bf[piece] = true
910                 return true
911         })
912         return
913 }
914
915 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
916         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
917 }
918
919 func (t *Torrent) chunksPerRegularPiece() uint32 {
920         return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
921 }
922
923 func (t *Torrent) numRequests() RequestIndex {
924         if t.numPieces() == 0 {
925                 return 0
926         }
927         return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
928 }
929
930 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
931         t.dirtyChunks.RemoveRange(
932                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
933                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
934 }
935
936 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
937         if t.info.PieceLength == 0 {
938                 // There will be no variance amongst pieces. Only pain.
939                 return 0
940         }
941         if piece == t.numPieces()-1 {
942                 ret := pp.Integer(*t.length % t.info.PieceLength)
943                 if ret != 0 {
944                         return ret
945                 }
946         }
947         return pp.Integer(t.info.PieceLength)
948 }
949
950 func (t *Torrent) smartBanBlockCheckingWriter(piece pieceIndex) *blockCheckingWriter {
951         return &blockCheckingWriter{
952                 cache:        &t.smartBanCache,
953                 requestIndex: t.pieceRequestIndexOffset(piece),
954                 chunkSize:    t.chunkSize.Int(),
955         }
956 }
957
958 func (t *Torrent) hashPiece(piece pieceIndex) (
959         ret metainfo.Hash,
960         // These are peers that sent us blocks that differ from what we hash here.
961         differingPeers map[bannableAddr]struct{},
962         err error,
963 ) {
964         p := t.piece(piece)
965         p.waitNoPendingWrites()
966         storagePiece := t.pieces[piece].Storage()
967
968         // Does the backend want to do its own hashing?
969         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
970                 var sum metainfo.Hash
971                 // log.Printf("A piece decided to self-hash: %d", piece)
972                 sum, err = i.SelfHash()
973                 missinggo.CopyExact(&ret, sum)
974                 return
975         }
976
977         hash := pieceHash.New()
978         const logPieceContents = false
979         smartBanWriter := t.smartBanBlockCheckingWriter(piece)
980         writers := []io.Writer{hash, smartBanWriter}
981         var examineBuf bytes.Buffer
982         if logPieceContents {
983                 writers = append(writers, &examineBuf)
984         }
985         _, err = storagePiece.WriteTo(io.MultiWriter(writers...))
986         if logPieceContents {
987                 log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
988         }
989         smartBanWriter.Flush()
990         differingPeers = smartBanWriter.badPeers
991         missinggo.CopyExact(&ret, hash.Sum(nil))
992         return
993 }
994
995 func (t *Torrent) haveAnyPieces() bool {
996         return !t._completedPieces.IsEmpty()
997 }
998
999 func (t *Torrent) haveAllPieces() bool {
1000         if !t.haveInfo() {
1001                 return false
1002         }
1003         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
1004 }
1005
1006 func (t *Torrent) havePiece(index pieceIndex) bool {
1007         return t.haveInfo() && t.pieceComplete(index)
1008 }
1009
1010 func (t *Torrent) maybeDropMutuallyCompletePeer(
1011         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
1012         // okay?
1013         p *Peer,
1014 ) {
1015         if !t.cl.config.DropMutuallyCompletePeers {
1016                 return
1017         }
1018         if !t.haveAllPieces() {
1019                 return
1020         }
1021         if all, known := p.peerHasAllPieces(); !(known && all) {
1022                 return
1023         }
1024         if p.useful() {
1025                 return
1026         }
1027         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
1028         p.drop()
1029 }
1030
1031 func (t *Torrent) haveChunk(r Request) (ret bool) {
1032         // defer func() {
1033         //      log.Println("have chunk", r, ret)
1034         // }()
1035         if !t.haveInfo() {
1036                 return false
1037         }
1038         if t.pieceComplete(pieceIndex(r.Index)) {
1039                 return true
1040         }
1041         p := &t.pieces[r.Index]
1042         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
1043 }
1044
1045 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
1046         return chunkIndexType(cs.Begin / chunkSize)
1047 }
1048
1049 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
1050         return t._pendingPieces.Contains(uint32(index))
1051 }
1052
1053 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1054 // conns (which is a map).
1055 var peerConnSlices sync.Pool
1056
1057 func getPeerConnSlice(cap int) []*PeerConn {
1058         getInterface := peerConnSlices.Get()
1059         if getInterface == nil {
1060                 return make([]*PeerConn, 0, cap)
1061         } else {
1062                 return getInterface.([]*PeerConn)[:0]
1063         }
1064 }
1065
1066 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1067 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1068 // established connections for more than a minute. This is O(n log n). If there was a way to not
1069 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1070 func (t *Torrent) worstBadConn() (ret *PeerConn) {
1071         wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
1072         defer peerConnSlices.Put(wcs.conns)
1073         wcs.initKeys()
1074         heap.Init(&wcs)
1075         for wcs.Len() != 0 {
1076                 c := heap.Pop(&wcs).(*PeerConn)
1077                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1078                         return c
1079                 }
1080                 // If the connection is in the worst half of the established
1081                 // connection quota and is older than a minute.
1082                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1083                         // Give connections 1 minute to prove themselves.
1084                         if time.Since(c.completedHandshake) > time.Minute {
1085                                 return c
1086                         }
1087                 }
1088         }
1089         return nil
1090 }
1091
1092 type PieceStateChange struct {
1093         Index int
1094         PieceState
1095 }
1096
1097 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1098         t.cl._mu.Defer(func() {
1099                 cur := t.pieceState(piece)
1100                 p := &t.pieces[piece]
1101                 if cur != p.publicPieceState {
1102                         p.publicPieceState = cur
1103                         t.pieceStateChanges.Publish(PieceStateChange{
1104                                 int(piece),
1105                                 cur,
1106                         })
1107                 }
1108         })
1109 }
1110
1111 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1112         if t.pieceComplete(piece) {
1113                 return 0
1114         }
1115         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1116 }
1117
1118 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1119         return t.pieces[piece].allChunksDirty()
1120 }
1121
1122 func (t *Torrent) readersChanged() {
1123         t.updateReaderPieces()
1124         t.updateAllPiecePriorities("Torrent.readersChanged")
1125 }
1126
1127 func (t *Torrent) updateReaderPieces() {
1128         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1129 }
1130
1131 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1132         if from == to {
1133                 return
1134         }
1135         t.updateReaderPieces()
1136         // Order the ranges, high and low.
1137         l, h := from, to
1138         if l.begin > h.begin {
1139                 l, h = h, l
1140         }
1141         if l.end < h.begin {
1142                 // Two distinct ranges.
1143                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1144                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1145         } else {
1146                 // Ranges overlap.
1147                 end := l.end
1148                 if h.end > end {
1149                         end = h.end
1150                 }
1151                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1152         }
1153 }
1154
1155 func (t *Torrent) maybeNewConns() {
1156         // Tickle the accept routine.
1157         t.cl.event.Broadcast()
1158         t.openNewConns()
1159 }
1160
1161 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1162         if t._pendingPieces.Contains(uint32(piece)) {
1163                 t.iterPeers(func(c *Peer) {
1164                         // if c.requestState.Interested {
1165                         //      return
1166                         // }
1167                         if !c.isLowOnRequests() {
1168                                 return
1169                         }
1170                         if !c.peerHasPiece(piece) {
1171                                 return
1172                         }
1173                         if c.requestState.Interested && c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) {
1174                                 return
1175                         }
1176                         c.updateRequests(reason)
1177                 })
1178         }
1179         t.maybeNewConns()
1180         t.publishPieceChange(piece)
1181 }
1182
1183 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1184         if !t.closed.IsSet() {
1185                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1186                 // request order.
1187                 t.updatePieceRequestOrder(piece)
1188         }
1189         p := &t.pieces[piece]
1190         newPrio := p.uncachedPriority()
1191         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1192         if newPrio == PiecePriorityNone {
1193                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1194                         return
1195                 }
1196         } else {
1197                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1198                         return
1199                 }
1200         }
1201         t.piecePriorityChanged(piece, reason)
1202 }
1203
1204 func (t *Torrent) updateAllPiecePriorities(reason string) {
1205         t.updatePiecePriorities(0, t.numPieces(), reason)
1206 }
1207
1208 // Update all piece priorities in one hit. This function should have the same
1209 // output as updatePiecePriority, but across all pieces.
1210 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1211         for i := begin; i < end; i++ {
1212                 t.updatePiecePriority(i, reason)
1213         }
1214 }
1215
1216 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1217 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1218         if off >= *t.length {
1219                 return
1220         }
1221         if off < 0 {
1222                 size += off
1223                 off = 0
1224         }
1225         if size <= 0 {
1226                 return
1227         }
1228         begin = pieceIndex(off / t.info.PieceLength)
1229         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1230         if end > pieceIndex(t.info.NumPieces()) {
1231                 end = pieceIndex(t.info.NumPieces())
1232         }
1233         return
1234 }
1235
1236 // Returns true if all iterations complete without breaking. Returns the read regions for all
1237 // readers. The reader regions should not be merged as some callers depend on this method to
1238 // enumerate readers.
1239 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1240         for r := range t.readers {
1241                 p := r.pieces
1242                 if p.begin >= p.end {
1243                         continue
1244                 }
1245                 if !f(p.begin, p.end) {
1246                         return false
1247                 }
1248         }
1249         return true
1250 }
1251
1252 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1253         return t.piece(piece).uncachedPriority()
1254 }
1255
1256 func (t *Torrent) pendRequest(req RequestIndex) {
1257         t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
1258 }
1259
1260 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1261         t.cl.event.Broadcast()
1262         if t.pieceComplete(piece) {
1263                 t.onPieceCompleted(piece)
1264         } else {
1265                 t.onIncompletePiece(piece)
1266         }
1267         t.updatePiecePriority(piece, reason)
1268 }
1269
1270 func (t *Torrent) numReceivedConns() (ret int) {
1271         for c := range t.conns {
1272                 if c.Discovery == PeerSourceIncoming {
1273                         ret++
1274                 }
1275         }
1276         return
1277 }
1278
1279 func (t *Torrent) maxHalfOpen() int {
1280         // Note that if we somehow exceed the maximum established conns, we want
1281         // the negative value to have an effect.
1282         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1283         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1284         // We want to allow some experimentation with new peers, and to try to
1285         // upset an oversupply of received connections.
1286         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1287 }
1288
1289 func (t *Torrent) openNewConns() (initiated int) {
1290         defer t.updateWantPeersEvent()
1291         for t.peers.Len() != 0 {
1292                 if !t.wantConns() {
1293                         return
1294                 }
1295                 if len(t.halfOpen) >= t.maxHalfOpen() {
1296                         return
1297                 }
1298                 if len(t.cl.dialers) == 0 {
1299                         return
1300                 }
1301                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1302                         return
1303                 }
1304                 p := t.peers.PopMax()
1305                 t.initiateConn(p)
1306                 initiated++
1307         }
1308         return
1309 }
1310
1311 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1312         p := t.piece(piece)
1313         uncached := t.pieceCompleteUncached(piece)
1314         cached := p.completion()
1315         changed := cached != uncached
1316         complete := uncached.Complete
1317         p.storageCompletionOk = uncached.Ok
1318         x := uint32(piece)
1319         if complete {
1320                 t._completedPieces.Add(x)
1321                 t.openNewConns()
1322         } else {
1323                 t._completedPieces.Remove(x)
1324         }
1325         p.t.updatePieceRequestOrder(piece)
1326         t.updateComplete()
1327         if complete && len(p.dirtiers) != 0 {
1328                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1329         }
1330         if changed {
1331                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger)
1332                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1333         }
1334         return changed
1335 }
1336
1337 // Non-blocking read. Client lock is not required.
1338 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1339         for len(b) != 0 {
1340                 p := &t.pieces[off/t.info.PieceLength]
1341                 p.waitNoPendingWrites()
1342                 var n1 int
1343                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1344                 if n1 == 0 {
1345                         break
1346                 }
1347                 off += int64(n1)
1348                 n += n1
1349                 b = b[n1:]
1350         }
1351         return
1352 }
1353
1354 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1355 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1356 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1357 func (t *Torrent) maybeCompleteMetadata() error {
1358         if t.haveInfo() {
1359                 // Nothing to do.
1360                 return nil
1361         }
1362         if !t.haveAllMetadataPieces() {
1363                 // Don't have enough metadata pieces.
1364                 return nil
1365         }
1366         err := t.setInfoBytesLocked(t.metadataBytes)
1367         if err != nil {
1368                 t.invalidateMetadata()
1369                 return fmt.Errorf("error setting info bytes: %s", err)
1370         }
1371         if t.cl.config.Debug {
1372                 t.logger.Printf("%s: got metadata from peers", t)
1373         }
1374         return nil
1375 }
1376
1377 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1378         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1379                 if end > begin {
1380                         now.Add(bitmap.BitIndex(begin))
1381                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1382                 }
1383                 return true
1384         })
1385         return
1386 }
1387
1388 func (t *Torrent) needData() bool {
1389         if t.closed.IsSet() {
1390                 return false
1391         }
1392         if !t.haveInfo() {
1393                 return true
1394         }
1395         return !t._pendingPieces.IsEmpty()
1396 }
1397
1398 func appendMissingStrings(old, new []string) (ret []string) {
1399         ret = old
1400 new:
1401         for _, n := range new {
1402                 for _, o := range old {
1403                         if o == n {
1404                                 continue new
1405                         }
1406                 }
1407                 ret = append(ret, n)
1408         }
1409         return
1410 }
1411
1412 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1413         ret = existing
1414         for minNumTiers > len(ret) {
1415                 ret = append(ret, nil)
1416         }
1417         return
1418 }
1419
1420 func (t *Torrent) addTrackers(announceList [][]string) {
1421         fullAnnounceList := &t.metainfo.AnnounceList
1422         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1423         for tierIndex, trackerURLs := range announceList {
1424                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1425         }
1426         t.startMissingTrackerScrapers()
1427         t.updateWantPeersEvent()
1428 }
1429
1430 // Don't call this before the info is available.
1431 func (t *Torrent) bytesCompleted() int64 {
1432         if !t.haveInfo() {
1433                 return 0
1434         }
1435         return *t.length - t.bytesLeft()
1436 }
1437
1438 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1439         t.cl.lock()
1440         defer t.cl.unlock()
1441         return t.setInfoBytesLocked(b)
1442 }
1443
1444 // Returns true if connection is removed from torrent.Conns.
1445 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1446         if !c.closed.IsSet() {
1447                 panic("connection is not closed")
1448                 // There are behaviours prevented by the closed state that will fail
1449                 // if the connection has been deleted.
1450         }
1451         _, ret = t.conns[c]
1452         delete(t.conns, c)
1453         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1454         // the drop event against the PexConnState instead.
1455         if ret {
1456                 if !t.cl.config.DisablePEX {
1457                         t.pex.Drop(c)
1458                 }
1459         }
1460         torrent.Add("deleted connections", 1)
1461         if !c.deleteAllRequests().IsEmpty() {
1462                 t.iterPeers(func(p *Peer) {
1463                         if p.isLowOnRequests() {
1464                                 p.updateRequests("Torrent.deletePeerConn")
1465                         }
1466                 })
1467         }
1468         t.assertPendingRequests()
1469         if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
1470                 panic(t.connsWithAllPieces)
1471         }
1472         return
1473 }
1474
1475 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1476         if t.deleteConnWithAllPieces(p) {
1477                 return
1478         }
1479         if !t.haveInfo() {
1480                 return
1481         }
1482         p.peerPieces().Iterate(func(i uint32) bool {
1483                 p.t.decPieceAvailability(pieceIndex(i))
1484                 return true
1485         })
1486 }
1487
1488 func (t *Torrent) assertPendingRequests() {
1489         if !check {
1490                 return
1491         }
1492         // var actual pendingRequests
1493         // if t.haveInfo() {
1494         //      actual.m = make([]int, t.numRequests())
1495         // }
1496         // t.iterPeers(func(p *Peer) {
1497         //      p.requestState.Requests.Iterate(func(x uint32) bool {
1498         //              actual.Inc(x)
1499         //              return true
1500         //      })
1501         // })
1502         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1503         // if diff != "" {
1504         //      panic(diff)
1505         // }
1506 }
1507
1508 func (t *Torrent) dropConnection(c *PeerConn) {
1509         t.cl.event.Broadcast()
1510         c.close()
1511         if t.deletePeerConn(c) {
1512                 t.openNewConns()
1513         }
1514 }
1515
1516 // Peers as in contact information for dialing out.
1517 func (t *Torrent) wantPeers() bool {
1518         if t.closed.IsSet() {
1519                 return false
1520         }
1521         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1522                 return false
1523         }
1524         return t.wantConns()
1525 }
1526
1527 func (t *Torrent) updateWantPeersEvent() {
1528         if t.wantPeers() {
1529                 t.wantPeersEvent.Set()
1530         } else {
1531                 t.wantPeersEvent.Clear()
1532         }
1533 }
1534
1535 // Returns whether the client should make effort to seed the torrent.
1536 func (t *Torrent) seeding() bool {
1537         cl := t.cl
1538         if t.closed.IsSet() {
1539                 return false
1540         }
1541         if t.dataUploadDisallowed {
1542                 return false
1543         }
1544         if cl.config.NoUpload {
1545                 return false
1546         }
1547         if !cl.config.Seed {
1548                 return false
1549         }
1550         if cl.config.DisableAggressiveUpload && t.needData() {
1551                 return false
1552         }
1553         return true
1554 }
1555
1556 func (t *Torrent) onWebRtcConn(
1557         c datachannel.ReadWriteCloser,
1558         dcc webtorrent.DataChannelContext,
1559 ) {
1560         defer c.Close()
1561         netConn := webrtcNetConn{
1562                 ReadWriteCloser:    c,
1563                 DataChannelContext: dcc,
1564         }
1565         peerRemoteAddr := netConn.RemoteAddr()
1566         if t.cl.badPeerAddr(peerRemoteAddr) {
1567                 return
1568         }
1569         pc, err := t.cl.initiateProtocolHandshakes(
1570                 context.Background(),
1571                 netConn,
1572                 t,
1573                 dcc.LocalOffered,
1574                 false,
1575                 netConn.RemoteAddr(),
1576                 webrtcNetwork,
1577                 fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
1578         )
1579         if err != nil {
1580                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1581                 return
1582         }
1583         if dcc.LocalOffered {
1584                 pc.Discovery = PeerSourceTracker
1585         } else {
1586                 pc.Discovery = PeerSourceIncoming
1587         }
1588         pc.conn.SetWriteDeadline(time.Time{})
1589         t.cl.lock()
1590         defer t.cl.unlock()
1591         err = t.cl.runHandshookConn(pc, t)
1592         if err != nil {
1593                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1594         }
1595 }
1596
1597 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1598         err := t.cl.runHandshookConn(pc, t)
1599         if err != nil || logAll {
1600                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1601         }
1602 }
1603
1604 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1605         t.logRunHandshookConn(pc, false, log.Debug)
1606 }
1607
1608 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1609         wtc, release := t.cl.websocketTrackers.Get(u.String())
1610         go func() {
1611                 <-t.closed.Done()
1612                 release()
1613         }()
1614         wst := websocketTrackerStatus{u, wtc}
1615         go func() {
1616                 err := wtc.Announce(tracker.Started, t.infoHash)
1617                 if err != nil {
1618                         t.logger.WithDefaultLevel(log.Warning).Printf(
1619                                 "error in initial announce to %q: %v",
1620                                 u.String(), err,
1621                         )
1622                 }
1623         }()
1624         return wst
1625 }
1626
1627 func (t *Torrent) startScrapingTracker(_url string) {
1628         if _url == "" {
1629                 return
1630         }
1631         u, err := url.Parse(_url)
1632         if err != nil {
1633                 // URLs with a leading '*' appear to be a uTorrent convention to
1634                 // disable trackers.
1635                 if _url[0] != '*' {
1636                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1637                 }
1638                 return
1639         }
1640         if u.Scheme == "udp" {
1641                 u.Scheme = "udp4"
1642                 t.startScrapingTracker(u.String())
1643                 u.Scheme = "udp6"
1644                 t.startScrapingTracker(u.String())
1645                 return
1646         }
1647         if _, ok := t.trackerAnnouncers[_url]; ok {
1648                 return
1649         }
1650         sl := func() torrentTrackerAnnouncer {
1651                 switch u.Scheme {
1652                 case "ws", "wss":
1653                         if t.cl.config.DisableWebtorrent {
1654                                 return nil
1655                         }
1656                         return t.startWebsocketAnnouncer(*u)
1657                 case "udp4":
1658                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1659                                 return nil
1660                         }
1661                 case "udp6":
1662                         if t.cl.config.DisableIPv6 {
1663                                 return nil
1664                         }
1665                 }
1666                 newAnnouncer := &trackerScraper{
1667                         u:               *u,
1668                         t:               t,
1669                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1670                 }
1671                 go newAnnouncer.Run()
1672                 return newAnnouncer
1673         }()
1674         if sl == nil {
1675                 return
1676         }
1677         if t.trackerAnnouncers == nil {
1678                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1679         }
1680         t.trackerAnnouncers[_url] = sl
1681 }
1682
1683 // Adds and starts tracker scrapers for tracker URLs that aren't already
1684 // running.
1685 func (t *Torrent) startMissingTrackerScrapers() {
1686         if t.cl.config.DisableTrackers {
1687                 return
1688         }
1689         t.startScrapingTracker(t.metainfo.Announce)
1690         for _, tier := range t.metainfo.AnnounceList {
1691                 for _, url := range tier {
1692                         t.startScrapingTracker(url)
1693                 }
1694         }
1695 }
1696
1697 // Returns an AnnounceRequest with fields filled out to defaults and current
1698 // values.
1699 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1700         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1701         // dependent on the network in use.
1702         return tracker.AnnounceRequest{
1703                 Event: event,
1704                 NumWant: func() int32 {
1705                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1706                                 return -1
1707                         } else {
1708                                 return 0
1709                         }
1710                 }(),
1711                 Port:     uint16(t.cl.incomingPeerPort()),
1712                 PeerId:   t.cl.peerID,
1713                 InfoHash: t.infoHash,
1714                 Key:      t.cl.announceKey(),
1715
1716                 // The following are vaguely described in BEP 3.
1717
1718                 Left:     t.bytesLeftAnnounce(),
1719                 Uploaded: t.stats.BytesWrittenData.Int64(),
1720                 // There's no mention of wasted or unwanted download in the BEP.
1721                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1722         }
1723 }
1724
1725 // Adds peers revealed in an announce until the announce ends, or we have
1726 // enough peers.
1727 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1728         cl := t.cl
1729         for v := range pvs {
1730                 cl.lock()
1731                 added := 0
1732                 for _, cp := range v.Peers {
1733                         if cp.Port == 0 {
1734                                 // Can't do anything with this.
1735                                 continue
1736                         }
1737                         if t.addPeer(PeerInfo{
1738                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1739                                 Source: PeerSourceDhtGetPeers,
1740                         }) {
1741                                 added++
1742                         }
1743                 }
1744                 cl.unlock()
1745                 // if added != 0 {
1746                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1747                 // }
1748         }
1749 }
1750
1751 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1752 // announce ends. stop will force the announce to end.
1753 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1754         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1755         if err != nil {
1756                 return
1757         }
1758         _done := make(chan struct{})
1759         done = _done
1760         stop = ps.Close
1761         go func() {
1762                 t.consumeDhtAnnouncePeers(ps.Peers())
1763                 close(_done)
1764         }()
1765         return
1766 }
1767
1768 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1769         _, stop, err := t.AnnounceToDht(s)
1770         if err != nil {
1771                 return err
1772         }
1773         select {
1774         case <-t.closed.Done():
1775         case <-time.After(5 * time.Minute):
1776         }
1777         stop()
1778         return nil
1779 }
1780
1781 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1782         cl := t.cl
1783         cl.lock()
1784         defer cl.unlock()
1785         for {
1786                 for {
1787                         if t.closed.IsSet() {
1788                                 return
1789                         }
1790                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1791                         // TODO: We can include the announce_peer step depending on whether we can receive
1792                         // inbound connections. We should probably only announce once every 15 mins too.
1793                         if !t.wantConns() {
1794                                 goto wait
1795                         }
1796                         // TODO: Determine if there's a listener on the port we're announcing.
1797                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1798                                 goto wait
1799                         }
1800                         break
1801                 wait:
1802                         cl.event.Wait()
1803                 }
1804                 func() {
1805                         t.numDHTAnnounces++
1806                         cl.unlock()
1807                         defer cl.lock()
1808                         err := t.timeboxedAnnounceToDht(s)
1809                         if err != nil {
1810                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1811                         }
1812                 }()
1813         }
1814 }
1815
1816 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1817         for _, p := range peers {
1818                 if t.addPeer(p) {
1819                         added++
1820                 }
1821         }
1822         return
1823 }
1824
1825 // The returned TorrentStats may require alignment in memory. See
1826 // https://github.com/anacrolix/torrent/issues/383.
1827 func (t *Torrent) Stats() TorrentStats {
1828         t.cl.rLock()
1829         defer t.cl.rUnlock()
1830         return t.statsLocked()
1831 }
1832
1833 func (t *Torrent) statsLocked() (ret TorrentStats) {
1834         ret.ActivePeers = len(t.conns)
1835         ret.HalfOpenPeers = len(t.halfOpen)
1836         ret.PendingPeers = t.peers.Len()
1837         ret.TotalPeers = t.numTotalPeers()
1838         ret.ConnectedSeeders = 0
1839         for c := range t.conns {
1840                 if all, ok := c.peerHasAllPieces(); all && ok {
1841                         ret.ConnectedSeeders++
1842                 }
1843         }
1844         ret.ConnStats = t.stats.Copy()
1845         ret.PiecesComplete = t.numPiecesCompleted()
1846         return
1847 }
1848
1849 // The total number of peers in the torrent.
1850 func (t *Torrent) numTotalPeers() int {
1851         peers := make(map[string]struct{})
1852         for conn := range t.conns {
1853                 ra := conn.conn.RemoteAddr()
1854                 if ra == nil {
1855                         // It's been closed and doesn't support RemoteAddr.
1856                         continue
1857                 }
1858                 peers[ra.String()] = struct{}{}
1859         }
1860         for addr := range t.halfOpen {
1861                 peers[addr] = struct{}{}
1862         }
1863         t.peers.Each(func(peer PeerInfo) {
1864                 peers[peer.Addr.String()] = struct{}{}
1865         })
1866         return len(peers)
1867 }
1868
1869 // Reconcile bytes transferred before connection was associated with a
1870 // torrent.
1871 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1872         if c._stats != (ConnStats{
1873                 // Handshakes should only increment these fields:
1874                 BytesWritten: c._stats.BytesWritten,
1875                 BytesRead:    c._stats.BytesRead,
1876         }) {
1877                 panic("bad stats")
1878         }
1879         c.postHandshakeStats(func(cs *ConnStats) {
1880                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1881                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1882         })
1883         c.reconciledHandshakeStats = true
1884 }
1885
1886 // Returns true if the connection is added.
1887 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1888         defer func() {
1889                 if err == nil {
1890                         torrent.Add("added connections", 1)
1891                 }
1892         }()
1893         if t.closed.IsSet() {
1894                 return errors.New("torrent closed")
1895         }
1896         for c0 := range t.conns {
1897                 if c.PeerID != c0.PeerID {
1898                         continue
1899                 }
1900                 if !t.cl.config.DropDuplicatePeerIds {
1901                         continue
1902                 }
1903                 if c.hasPreferredNetworkOver(c0) {
1904                         c0.close()
1905                         t.deletePeerConn(c0)
1906                 } else {
1907                         return errors.New("existing connection preferred")
1908                 }
1909         }
1910         if len(t.conns) >= t.maxEstablishedConns {
1911                 c := t.worstBadConn()
1912                 if c == nil {
1913                         return errors.New("don't want conns")
1914                 }
1915                 c.close()
1916                 t.deletePeerConn(c)
1917         }
1918         if len(t.conns) >= t.maxEstablishedConns {
1919                 panic(len(t.conns))
1920         }
1921         t.conns[c] = struct{}{}
1922         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1923                 t.pex.Add(c) // as no further extended handshake expected
1924         }
1925         return nil
1926 }
1927
1928 func (t *Torrent) wantConns() bool {
1929         if !t.networkingEnabled.Bool() {
1930                 return false
1931         }
1932         if t.closed.IsSet() {
1933                 return false
1934         }
1935         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
1936                 return false
1937         }
1938         return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
1939 }
1940
1941 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1942         t.cl.lock()
1943         defer t.cl.unlock()
1944         oldMax = t.maxEstablishedConns
1945         t.maxEstablishedConns = max
1946         wcs := worseConnSlice{
1947                 conns: t.appendConns(nil, func(*PeerConn) bool {
1948                         return true
1949                 }),
1950         }
1951         wcs.initKeys()
1952         heap.Init(&wcs)
1953         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1954                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
1955         }
1956         t.openNewConns()
1957         return oldMax
1958 }
1959
1960 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1961         t.logger.LazyLog(log.Debug, func() log.Msg {
1962                 return log.Fstr("hashed piece %d (passed=%t)", piece, passed)
1963         })
1964         p := t.piece(piece)
1965         p.numVerifies++
1966         t.cl.event.Broadcast()
1967         if t.closed.IsSet() {
1968                 return
1969         }
1970
1971         // Don't score the first time a piece is hashed, it could be an initial check.
1972         if p.storageCompletionOk {
1973                 if passed {
1974                         pieceHashedCorrect.Add(1)
1975                 } else {
1976                         log.Fmsg(
1977                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1978                         ).AddValues(t, p).LogLevel(
1979
1980                                 log.Debug, t.logger)
1981
1982                         pieceHashedNotCorrect.Add(1)
1983                 }
1984         }
1985
1986         p.marking = true
1987         t.publishPieceChange(piece)
1988         defer func() {
1989                 p.marking = false
1990                 t.publishPieceChange(piece)
1991         }()
1992
1993         if passed {
1994                 if len(p.dirtiers) != 0 {
1995                         // Don't increment stats above connection-level for every involved connection.
1996                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1997                 }
1998                 for c := range p.dirtiers {
1999                         c._stats.incrementPiecesDirtiedGood()
2000                 }
2001                 t.clearPieceTouchers(piece)
2002                 t.cl.unlock()
2003                 err := p.Storage().MarkComplete()
2004                 if err != nil {
2005                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
2006                 }
2007                 t.cl.lock()
2008
2009                 if t.closed.IsSet() {
2010                         return
2011                 }
2012                 t.pendAllChunkSpecs(piece)
2013         } else {
2014                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
2015                         // Peers contributed to all the data for this piece hash failure, and the failure was
2016                         // not due to errors in the storage (such as data being dropped in a cache).
2017
2018                         // Increment Torrent and above stats, and then specific connections.
2019                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
2020                         for c := range p.dirtiers {
2021                                 // Y u do dis peer?!
2022                                 c.stats().incrementPiecesDirtiedBad()
2023                         }
2024
2025                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
2026                         for c := range p.dirtiers {
2027                                 if !c.trusted {
2028                                         bannableTouchers = append(bannableTouchers, c)
2029                                 }
2030                         }
2031                         t.clearPieceTouchers(piece)
2032                         slices.Sort(bannableTouchers, connLessTrusted)
2033
2034                         if t.cl.config.Debug {
2035                                 t.logger.Printf(
2036                                         "bannable conns by trust for piece %d: %v",
2037                                         piece,
2038                                         func() (ret []connectionTrust) {
2039                                                 for _, c := range bannableTouchers {
2040                                                         ret = append(ret, c.trust())
2041                                                 }
2042                                                 return
2043                                         }(),
2044                                 )
2045                         }
2046
2047                         if len(bannableTouchers) >= 1 {
2048                                 c := bannableTouchers[0]
2049                                 if len(bannableTouchers) != 1 {
2050                                         t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece)
2051                                 } else {
2052                                         // Turns out it's still useful to ban peers like this because if there's only a
2053                                         // single peer for a piece, and we never progress that piece to completion, we
2054                                         // will never smart-ban them. Discovered in
2055                                         // https://github.com/anacrolix/torrent/issues/715.
2056                                         t.logger.Levelf(log.Warning, "banning %v for being sole dirtier of piece %v after failed piece check", c, piece)
2057                                         c.ban()
2058                                 }
2059                         }
2060                 }
2061                 t.onIncompletePiece(piece)
2062                 p.Storage().MarkNotComplete()
2063         }
2064         t.updatePieceCompletion(piece)
2065 }
2066
2067 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
2068         for ri := t.pieceRequestIndexOffset(piece); ri < t.pieceRequestIndexOffset(piece+1); ri++ {
2069                 t.cancelRequest(ri)
2070         }
2071 }
2072
2073 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
2074         t.pendAllChunkSpecs(piece)
2075         t.cancelRequestsForPiece(piece)
2076         t.piece(piece).readerCond.Broadcast()
2077         for conn := range t.conns {
2078                 conn.have(piece)
2079                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
2080         }
2081 }
2082
2083 // Called when a piece is found to be not complete.
2084 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2085         if t.pieceAllDirty(piece) {
2086                 t.pendAllChunkSpecs(piece)
2087         }
2088         if !t.wantPieceIndex(piece) {
2089                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2090                 return
2091         }
2092         // We could drop any connections that we told we have a piece that we
2093         // don't here. But there's a test failure, and it seems clients don't care
2094         // if you request pieces that you already claim to have. Pruning bad
2095         // connections might just remove any connections that aren't treating us
2096         // favourably anyway.
2097
2098         // for c := range t.conns {
2099         //      if c.sentHave(piece) {
2100         //              c.drop()
2101         //      }
2102         // }
2103         t.iterPeers(func(conn *Peer) {
2104                 if conn.peerHasPiece(piece) {
2105                         conn.updateRequests("piece incomplete")
2106                 }
2107         })
2108 }
2109
2110 func (t *Torrent) tryCreateMorePieceHashers() {
2111         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2112         }
2113 }
2114
2115 func (t *Torrent) tryCreatePieceHasher() bool {
2116         if t.storage == nil {
2117                 return false
2118         }
2119         pi, ok := t.getPieceToHash()
2120         if !ok {
2121                 return false
2122         }
2123         p := t.piece(pi)
2124         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2125         p.hashing = true
2126         t.publishPieceChange(pi)
2127         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2128         t.storageLock.RLock()
2129         t.activePieceHashes++
2130         go t.pieceHasher(pi)
2131         return true
2132 }
2133
2134 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2135         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2136                 if t.piece(i).hashing {
2137                         return true
2138                 }
2139                 ret = i
2140                 ok = true
2141                 return false
2142         })
2143         return
2144 }
2145
2146 func (t *Torrent) dropBannedPeers() {
2147         t.iterPeers(func(p *Peer) {
2148                 remoteIp := p.remoteIp()
2149                 if remoteIp == nil {
2150                         if p.bannableAddr.Ok() {
2151                                 log.Printf("can't get remote ip for peer %v", p)
2152                         }
2153                         return
2154                 }
2155                 netipAddr := netip.MustParseAddr(remoteIp.String())
2156                 if Some(netipAddr) != p.bannableAddr {
2157                         log.Printf(
2158                                 "peer remote ip does not match its bannable addr [peer=%v, remote ip=%v, bannable addr=%v]",
2159                                 p, remoteIp, p.bannableAddr)
2160                 }
2161                 if _, ok := t.cl.badPeerIPs[netipAddr]; ok {
2162                         // Should this be a close?
2163                         p.drop()
2164                         log.Printf("dropped %v for banned remote IP %v", p, netipAddr)
2165                 }
2166         })
2167 }
2168
2169 func (t *Torrent) pieceHasher(index pieceIndex) {
2170         p := t.piece(index)
2171         sum, failedPeers, copyErr := t.hashPiece(index)
2172         correct := sum == *p.hash
2173         switch copyErr {
2174         case nil, io.EOF:
2175         default:
2176                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2177         }
2178         t.storageLock.RUnlock()
2179         t.cl.lock()
2180         defer t.cl.unlock()
2181         if correct {
2182                 for peer := range failedPeers {
2183                         t.cl.banPeerIP(peer.AsSlice())
2184                         log.Printf("smart banned %v for piece %v", peer, index)
2185                 }
2186                 t.dropBannedPeers()
2187                 for ri := t.pieceRequestIndexOffset(index); ri < t.pieceRequestIndexOffset(index+1); ri++ {
2188                         t.smartBanCache.ForgetBlock(ri)
2189                 }
2190         }
2191         p.hashing = false
2192         t.pieceHashed(index, correct, copyErr)
2193         t.updatePiecePriority(index, "Torrent.pieceHasher")
2194         t.activePieceHashes--
2195         t.tryCreateMorePieceHashers()
2196 }
2197
2198 // Return the connections that touched a piece, and clear the entries while doing it.
2199 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2200         p := t.piece(pi)
2201         for c := range p.dirtiers {
2202                 delete(c.peerTouchedPieces, pi)
2203                 delete(p.dirtiers, c)
2204         }
2205 }
2206
2207 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2208         t.iterPeers(func(p *Peer) {
2209                 ret = append(ret, p)
2210         })
2211         return
2212 }
2213
2214 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2215         piece := t.piece(pieceIndex)
2216         if piece.queuedForHash() {
2217                 return
2218         }
2219         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2220         t.publishPieceChange(pieceIndex)
2221         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2222         t.tryCreateMorePieceHashers()
2223 }
2224
2225 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2226 // before the Info is available.
2227 func (t *Torrent) VerifyData() {
2228         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2229                 t.Piece(i).VerifyData()
2230         }
2231 }
2232
2233 // Start the process of connecting to the given peer for the given torrent if appropriate.
2234 func (t *Torrent) initiateConn(peer PeerInfo) {
2235         if peer.Id == t.cl.peerID {
2236                 return
2237         }
2238         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2239                 return
2240         }
2241         addr := peer.Addr
2242         if t.addrActive(addr.String()) {
2243                 return
2244         }
2245         t.cl.numHalfOpen++
2246         t.halfOpen[addr.String()] = peer
2247         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2248 }
2249
2250 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2251 // quickly make one Client visible to the Torrent of another Client.
2252 func (t *Torrent) AddClientPeer(cl *Client) int {
2253         return t.AddPeers(func() (ps []PeerInfo) {
2254                 for _, la := range cl.ListenAddrs() {
2255                         ps = append(ps, PeerInfo{
2256                                 Addr:    la,
2257                                 Trusted: true,
2258                         })
2259                 }
2260                 return
2261         }())
2262 }
2263
2264 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2265 // connection.
2266 func (t *Torrent) allStats(f func(*ConnStats)) {
2267         f(&t.stats)
2268         f(&t.cl.stats)
2269 }
2270
2271 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2272         return t.pieces[i].hashing
2273 }
2274
2275 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2276         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2277 }
2278
2279 func (t *Torrent) dialTimeout() time.Duration {
2280         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2281 }
2282
2283 func (t *Torrent) piece(i int) *Piece {
2284         return &t.pieces[i]
2285 }
2286
2287 func (t *Torrent) onWriteChunkErr(err error) {
2288         if t.userOnWriteChunkErr != nil {
2289                 go t.userOnWriteChunkErr(err)
2290                 return
2291         }
2292         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2293         t.disallowDataDownloadLocked()
2294 }
2295
2296 func (t *Torrent) DisallowDataDownload() {
2297         t.disallowDataDownloadLocked()
2298 }
2299
2300 func (t *Torrent) disallowDataDownloadLocked() {
2301         t.dataDownloadDisallowed.Set()
2302 }
2303
2304 func (t *Torrent) AllowDataDownload() {
2305         t.dataDownloadDisallowed.Clear()
2306 }
2307
2308 // Enables uploading data, if it was disabled.
2309 func (t *Torrent) AllowDataUpload() {
2310         t.cl.lock()
2311         defer t.cl.unlock()
2312         t.dataUploadDisallowed = false
2313         for c := range t.conns {
2314                 c.updateRequests("allow data upload")
2315         }
2316 }
2317
2318 // Disables uploading data, if it was enabled.
2319 func (t *Torrent) DisallowDataUpload() {
2320         t.cl.lock()
2321         defer t.cl.unlock()
2322         t.dataUploadDisallowed = true
2323         for c := range t.conns {
2324                 // TODO: This doesn't look right. Shouldn't we tickle writers to choke peers or something instead?
2325                 c.updateRequests("disallow data upload")
2326         }
2327 }
2328
2329 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2330 // or if nil, a critical message is logged, and data download is disabled.
2331 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2332         t.cl.lock()
2333         defer t.cl.unlock()
2334         t.userOnWriteChunkErr = f
2335 }
2336
2337 func (t *Torrent) iterPeers(f func(p *Peer)) {
2338         for pc := range t.conns {
2339                 f(&pc.Peer)
2340         }
2341         for _, ws := range t.webSeeds {
2342                 f(ws)
2343         }
2344 }
2345
2346 func (t *Torrent) callbacks() *Callbacks {
2347         return &t.cl.config.Callbacks
2348 }
2349
2350 func (t *Torrent) AddWebSeeds(urls []string) {
2351         t.cl.lock()
2352         defer t.cl.unlock()
2353         for _, u := range urls {
2354                 t.addWebSeed(u)
2355         }
2356 }
2357
2358 func (t *Torrent) addWebSeed(url string) {
2359         if t.cl.config.DisableWebseeds {
2360                 return
2361         }
2362         if _, ok := t.webSeeds[url]; ok {
2363                 return
2364         }
2365         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2366         // right away. This value should be some multiple of the number of connections to a host. I
2367         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2368         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2369         // "https://webtorrent.io/torrents/".
2370         const maxRequests = 16
2371         ws := webseedPeer{
2372                 peer: Peer{
2373                         t:                        t,
2374                         outgoing:                 true,
2375                         Network:                  "http",
2376                         reconciledHandshakeStats: true,
2377                         // This should affect how often we have to recompute requests for this peer. Note that
2378                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2379                         // requests mark more often, so recomputation is probably sooner than with regular peer
2380                         // conns. ~4x maxRequests would be about right.
2381                         PeerMaxRequests: 128,
2382                         // TODO: Set ban prefix?
2383                         RemoteAddr: remoteAddrFromUrl(url),
2384                         callbacks:  t.callbacks(),
2385                 },
2386                 client: webseed.Client{
2387                         HttpClient: t.cl.httpClient,
2388                         Url:        url,
2389                         ResponseBodyWrapper: func(r io.Reader) io.Reader {
2390                                 return &rateLimitedReader{
2391                                         l: t.cl.config.DownloadRateLimiter,
2392                                         r: r,
2393                                 }
2394                         },
2395                 },
2396                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2397                 maxRequests:    maxRequests,
2398         }
2399         ws.peer.initUpdateRequestsTimer()
2400         ws.requesterCond.L = t.cl.locker()
2401         for i := 0; i < maxRequests; i += 1 {
2402                 go ws.requester(i)
2403         }
2404         for _, f := range t.callbacks().NewPeer {
2405                 f(&ws.peer)
2406         }
2407         ws.peer.logger = t.logger.WithContextValue(&ws)
2408         ws.peer.peerImpl = &ws
2409         if t.haveInfo() {
2410                 ws.onGotInfo(t.info)
2411         }
2412         t.webSeeds[url] = &ws.peer
2413 }
2414
2415 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2416         t.iterPeers(func(p1 *Peer) {
2417                 if p1 == p {
2418                         active = true
2419                 }
2420         })
2421         return
2422 }
2423
2424 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2425         index := ri / t.chunksPerRegularPiece()
2426         return Request{
2427                 pp.Integer(index),
2428                 t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2429         }
2430 }
2431
2432 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2433         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
2434 }
2435
2436 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2437         return RequestIndex(piece) * t.chunksPerRegularPiece()
2438 }
2439
2440 func (t *Torrent) updateComplete() {
2441         t.Complete.SetBool(t.haveAllPieces())
2442 }
2443
2444 func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
2445         p := t.pendingRequests[r]
2446         if p != nil {
2447                 p.cancel(r)
2448         }
2449         delete(t.pendingRequests, r)
2450         return p
2451 }
2452
2453 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
2454         return t.pendingRequests[r]
2455 }
2456
2457 func (t *Torrent) addConnWithAllPieces(p *Peer) {
2458         if t.connsWithAllPieces == nil {
2459                 t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
2460         }
2461         t.connsWithAllPieces[p] = struct{}{}
2462 }
2463
2464 func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
2465         _, ok := t.connsWithAllPieces[p]
2466         delete(t.connsWithAllPieces, p)
2467         return ok
2468 }
2469
2470 func (t *Torrent) numActivePeers() int {
2471         return len(t.conns) + len(t.webSeeds)
2472 }
2473
2474 func (t *Torrent) hasStorageCap() bool {
2475         f := t.storage.Capacity
2476         if f == nil {
2477                 return false
2478         }
2479         _, ok := (*f)()
2480         return ok
2481 }