]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Implement piece request ordering with retained state
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "net/url"
12         "sort"
13         "strings"
14         "text/tabwriter"
15         "time"
16         "unsafe"
17
18         "github.com/RoaringBitmap/roaring"
19         "github.com/anacrolix/chansync"
20         "github.com/anacrolix/chansync/events"
21         "github.com/anacrolix/dht/v2"
22         "github.com/anacrolix/log"
23         "github.com/anacrolix/missinggo/perf"
24         "github.com/anacrolix/missinggo/pubsub"
25         "github.com/anacrolix/missinggo/slices"
26         "github.com/anacrolix/missinggo/v2"
27         "github.com/anacrolix/missinggo/v2/bitmap"
28         "github.com/anacrolix/multiless"
29         "github.com/anacrolix/sync"
30         request_strategy "github.com/anacrolix/torrent/request-strategy"
31         "github.com/davecgh/go-spew/spew"
32         "github.com/pion/datachannel"
33
34         "github.com/anacrolix/torrent/bencode"
35         "github.com/anacrolix/torrent/common"
36         "github.com/anacrolix/torrent/metainfo"
37         pp "github.com/anacrolix/torrent/peer_protocol"
38         "github.com/anacrolix/torrent/segments"
39         "github.com/anacrolix/torrent/storage"
40         "github.com/anacrolix/torrent/tracker"
41         "github.com/anacrolix/torrent/webseed"
42         "github.com/anacrolix/torrent/webtorrent"
43 )
44
45 // Maintains state of torrent within a Client. Many methods should not be called before the info is
46 // available, see .Info and .GotInfo.
47 type Torrent struct {
48         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
49         // alignment. See #262.
50         stats  ConnStats
51         cl     *Client
52         logger log.Logger
53
54         networkingEnabled      chansync.Flag
55         dataDownloadDisallowed chansync.Flag
56         dataUploadDisallowed   bool
57         userOnWriteChunkErr    func(error)
58
59         closed   chansync.SetOnce
60         infoHash metainfo.Hash
61         pieces   []Piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         // The size of chunks to request from peers over the wire. This is
65         // normally 16KiB by convention these days.
66         chunkSize pp.Integer
67         chunkPool sync.Pool
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length *int64
71
72         // The storage to open when the info dict becomes available.
73         storageOpener *storage.Client
74         // Storage for torrent data.
75         storage *storage.Torrent
76         // Read-locked for using storage, and write-locked for Closing.
77         storageLock sync.RWMutex
78
79         // TODO: Only announce stuff is used?
80         metainfo metainfo.MetaInfo
81
82         // The info dict. nil if we don't have it (yet).
83         info      *metainfo.Info
84         fileIndex segments.Index
85         files     *[]*File
86
87         webSeeds map[string]*Peer
88
89         // Active peer connections, running message stream loops. TODO: Make this
90         // open (not-closed) connections only.
91         conns               map[*PeerConn]struct{}
92         maxEstablishedConns int
93         // Set of addrs to which we're attempting to connect. Connections are
94         // half-open until all handshakes are completed.
95         halfOpen map[string]PeerInfo
96
97         // Reserve of peers to connect to. A peer can be both here and in the
98         // active connections if were told about the peer after connecting with
99         // them. That encourages us to reconnect to peers that are well known in
100         // the swarm.
101         peers prioritizedPeers
102         // Whether we want to know to know more peers.
103         wantPeersEvent missinggo.Event
104         // An announcer for each tracker URL.
105         trackerAnnouncers map[string]torrentTrackerAnnouncer
106         // How many times we've initiated a DHT announce. TODO: Move into stats.
107         numDHTAnnounces int
108
109         // Name used if the info name isn't available. Should be cleared when the
110         // Info does become available.
111         nameMu      sync.RWMutex
112         displayName string
113
114         // The bencoded bytes of the info dict. This is actively manipulated if
115         // the info bytes aren't initially available, and we try to fetch them
116         // from peers.
117         metadataBytes []byte
118         // Each element corresponds to the 16KiB metadata pieces. If true, we have
119         // received that piece.
120         metadataCompletedChunks []bool
121         metadataChanged         sync.Cond
122
123         // Closed when .Info is obtained.
124         gotMetainfoC chan struct{}
125
126         readers                map[*reader]struct{}
127         _readerNowPieces       bitmap.Bitmap
128         _readerReadaheadPieces bitmap.Bitmap
129
130         // A cache of pieces we need to get. Calculated from various piece and
131         // file priorities and completion states elsewhere.
132         _pendingPieces roaring.Bitmap
133         // A cache of completed piece indices.
134         _completedPieces roaring.Bitmap
135         // Pieces that need to be hashed.
136         piecesQueuedForHash       bitmap.Bitmap
137         activePieceHashes         int
138         initialPieceCheckDisabled bool
139
140         // Count of each request across active connections.
141         pendingRequests pendingRequests
142         // Chunks we've written to since the corresponding piece was last checked.
143         dirtyChunks roaring.Bitmap
144
145         pex pexState
146
147         // Is On when all pieces are complete.
148         Complete chansync.Flag
149 }
150
151 func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
152         t.iterPeers(func(peer *Peer) {
153                 if peer.peerHasPiece(i) {
154                         count++
155                 }
156         })
157         return
158 }
159
160 func (t *Torrent) decPieceAvailability(i pieceIndex) {
161         if !t.haveInfo() {
162                 return
163         }
164         p := t.piece(i)
165         if p.availability <= 0 {
166                 panic(p.availability)
167         }
168         p.availability--
169         t.updatePieceRequestOrder(i)
170 }
171
172 func (t *Torrent) incPieceAvailability(i pieceIndex) {
173         // If we don't the info, this should be reconciled when we do.
174         if t.haveInfo() {
175                 p := t.piece(i)
176                 p.availability++
177                 t.updatePieceRequestOrder(i)
178         }
179 }
180
181 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
182         return t._readerNowPieces
183 }
184
185 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
186         return t._readerReadaheadPieces
187 }
188
189 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
190         return !t.wantPieceIndex(i)
191 }
192
193 // Returns a channel that is closed when the Torrent is closed.
194 func (t *Torrent) Closed() events.Done {
195         return t.closed.Done()
196 }
197
198 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
199 // pending, and half-open peers.
200 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
201         // Add pending peers to the list
202         t.peers.Each(func(peer PeerInfo) {
203                 ks = append(ks, peer)
204         })
205
206         // Add half-open peers to the list
207         for _, peer := range t.halfOpen {
208                 ks = append(ks, peer)
209         }
210
211         // Add active peers to the list
212         for conn := range t.conns {
213                 ks = append(ks, PeerInfo{
214                         Id:     conn.PeerID,
215                         Addr:   conn.RemoteAddr,
216                         Source: conn.Discovery,
217                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
218                         // > But if we're not connected to them with an encrypted connection, I couldn't say
219                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
220                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
221                         // > connection. It's probably easiest to do the latter for now.
222                         // https://github.com/anacrolix/torrent/pull/188
223                         SupportsEncryption: conn.headerEncrypted,
224                 })
225         }
226
227         return
228 }
229
230 func (t *Torrent) setChunkSize(size pp.Integer) {
231         t.chunkSize = size
232         t.chunkPool = sync.Pool{
233                 New: func() interface{} {
234                         b := make([]byte, size)
235                         return &b
236                 },
237         }
238 }
239
240 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
241         return t._completedPieces.Contains(bitmap.BitIndex(piece))
242 }
243
244 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
245         return t.pieces[piece].Storage().Completion()
246 }
247
248 // There's a connection to that address already.
249 func (t *Torrent) addrActive(addr string) bool {
250         if _, ok := t.halfOpen[addr]; ok {
251                 return true
252         }
253         for c := range t.conns {
254                 ra := c.RemoteAddr
255                 if ra.String() == addr {
256                         return true
257                 }
258         }
259         return false
260 }
261
262 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
263         return t.appendConns(ret, func(conn *PeerConn) bool {
264                 return !conn.closed.IsSet()
265         })
266 }
267
268 func (t *Torrent) appendConns(ret []*PeerConn, f func(*PeerConn) bool) []*PeerConn {
269         for c := range t.conns {
270                 if f(c) {
271                         ret = append(ret, c)
272                 }
273         }
274         return ret
275 }
276
277 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
278         cl := t.cl
279         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
280         if t.closed.IsSet() {
281                 return false
282         }
283         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
284                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
285                         torrent.Add("peers not added because of bad addr", 1)
286                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
287                         return false
288                 }
289         }
290         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
291                 torrent.Add("peers replaced", 1)
292                 if !replaced.equal(p) {
293                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
294                         added = true
295                 }
296         } else {
297                 added = true
298         }
299         t.openNewConns()
300         for t.peers.Len() > cl.config.TorrentPeersHighWater {
301                 _, ok := t.peers.DeleteMin()
302                 if ok {
303                         torrent.Add("excess reserve peers discarded", 1)
304                 }
305         }
306         return
307 }
308
309 func (t *Torrent) invalidateMetadata() {
310         for i := 0; i < len(t.metadataCompletedChunks); i++ {
311                 t.metadataCompletedChunks[i] = false
312         }
313         t.nameMu.Lock()
314         t.gotMetainfoC = make(chan struct{})
315         t.info = nil
316         t.nameMu.Unlock()
317 }
318
319 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
320         if t.haveInfo() {
321                 return
322         }
323         if index >= len(t.metadataCompletedChunks) {
324                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
325                 return
326         }
327         copy(t.metadataBytes[(1<<14)*index:], data)
328         t.metadataCompletedChunks[index] = true
329 }
330
331 func (t *Torrent) metadataPieceCount() int {
332         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
333 }
334
335 func (t *Torrent) haveMetadataPiece(piece int) bool {
336         if t.haveInfo() {
337                 return (1<<14)*piece < len(t.metadataBytes)
338         } else {
339                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
340         }
341 }
342
343 func (t *Torrent) metadataSize() int {
344         return len(t.metadataBytes)
345 }
346
347 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
348         for i := 0; i < len(info.Pieces); i += sha1.Size {
349                 ret = append(ret, info.Pieces[i:i+sha1.Size])
350         }
351         return
352 }
353
354 func (t *Torrent) makePieces() {
355         hashes := infoPieceHashes(t.info)
356         t.pieces = make([]Piece, len(hashes))
357         for i, hash := range hashes {
358                 piece := &t.pieces[i]
359                 piece.t = t
360                 piece.index = pieceIndex(i)
361                 piece.noPendingWrites.L = &piece.pendingWritesMutex
362                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
363                 files := *t.files
364                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
365                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
366                 piece.files = files[beginFile:endFile]
367                 piece.undirtiedChunksIter = undirtiedChunksIter{
368                         TorrentDirtyChunks: &t.dirtyChunks,
369                         StartRequestIndex:  piece.requestIndexOffset(),
370                         EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
371                 }
372         }
373 }
374
375 // Returns the index of the first file containing the piece. files must be
376 // ordered by offset.
377 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
378         for i, f := range files {
379                 if f.offset+f.length > pieceOffset {
380                         return i
381                 }
382         }
383         return 0
384 }
385
386 // Returns the index after the last file containing the piece. files must be
387 // ordered by offset.
388 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
389         for i, f := range files {
390                 if f.offset+f.length >= pieceEndOffset {
391                         return i + 1
392                 }
393         }
394         return 0
395 }
396
397 func (t *Torrent) cacheLength() {
398         var l int64
399         for _, f := range t.info.UpvertedFiles() {
400                 l += f.Length
401         }
402         t.length = &l
403 }
404
405 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
406 // separately.
407 func (t *Torrent) setInfo(info *metainfo.Info) error {
408         if err := validateInfo(info); err != nil {
409                 return fmt.Errorf("bad info: %s", err)
410         }
411         if t.storageOpener != nil {
412                 var err error
413                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
414                 if err != nil {
415                         return fmt.Errorf("error opening torrent storage: %s", err)
416                 }
417         }
418         t.nameMu.Lock()
419         t.info = info
420         t.nameMu.Unlock()
421         t.updateComplete()
422         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
423         t.displayName = "" // Save a few bytes lol.
424         t.initFiles()
425         t.cacheLength()
426         t.makePieces()
427         return nil
428 }
429
430 func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
431         return request_strategy.PieceRequestOrderKey{
432                 InfoHash: t.infoHash,
433                 Index:    i,
434         }
435 }
436
437 // This seems to be all the follow-up tasks after info is set, that can't fail.
438 func (t *Torrent) onSetInfo() {
439         if t.cl.pieceRequestOrder == nil {
440                 t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder)
441         }
442         if t.cl.pieceRequestOrder[t.storage.Capacity] == nil {
443                 t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder()
444         }
445         for i := range t.pieces {
446                 p := &t.pieces[i]
447                 // Need to add availability before updating piece completion, as that may result in conns
448                 // being dropped.
449                 if p.availability != 0 {
450                         panic(p.availability)
451                 }
452                 p.availability = int64(t.pieceAvailabilityFromPeers(i))
453                 t.cl.pieceRequestOrder[t.storage.Capacity].Add(
454                         t.pieceRequestOrderKey(i),
455                         t.requestStrategyPieceOrderState(i))
456                 t.updatePieceCompletion(pieceIndex(i))
457                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
458                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
459                         t.queuePieceCheck(pieceIndex(i))
460                 }
461         }
462         t.cl.event.Broadcast()
463         close(t.gotMetainfoC)
464         t.updateWantPeersEvent()
465         t.pendingRequests.Init(t.numRequests())
466         t.tryCreateMorePieceHashers()
467         t.iterPeers(func(p *Peer) {
468                 p.onGotInfo(t.info)
469                 p.updateRequests("onSetInfo")
470         })
471 }
472
473 // Called when metadata for a torrent becomes available.
474 func (t *Torrent) setInfoBytesLocked(b []byte) error {
475         if metainfo.HashBytes(b) != t.infoHash {
476                 return errors.New("info bytes have wrong hash")
477         }
478         var info metainfo.Info
479         if err := bencode.Unmarshal(b, &info); err != nil {
480                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
481         }
482         t.metadataBytes = b
483         t.metadataCompletedChunks = nil
484         if t.info != nil {
485                 return nil
486         }
487         if err := t.setInfo(&info); err != nil {
488                 return err
489         }
490         t.onSetInfo()
491         return nil
492 }
493
494 func (t *Torrent) haveAllMetadataPieces() bool {
495         if t.haveInfo() {
496                 return true
497         }
498         if t.metadataCompletedChunks == nil {
499                 return false
500         }
501         for _, have := range t.metadataCompletedChunks {
502                 if !have {
503                         return false
504                 }
505         }
506         return true
507 }
508
509 // TODO: Propagate errors to disconnect peer.
510 func (t *Torrent) setMetadataSize(size int) (err error) {
511         if t.haveInfo() {
512                 // We already know the correct metadata size.
513                 return
514         }
515         if uint32(size) > maxMetadataSize {
516                 return errors.New("bad size")
517         }
518         if len(t.metadataBytes) == size {
519                 return
520         }
521         t.metadataBytes = make([]byte, size)
522         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
523         t.metadataChanged.Broadcast()
524         for c := range t.conns {
525                 c.requestPendingMetadata()
526         }
527         return
528 }
529
530 // The current working name for the torrent. Either the name in the info dict,
531 // or a display name given such as by the dn value in a magnet link, or "".
532 func (t *Torrent) name() string {
533         t.nameMu.RLock()
534         defer t.nameMu.RUnlock()
535         if t.haveInfo() {
536                 return t.info.Name
537         }
538         if t.displayName != "" {
539                 return t.displayName
540         }
541         return "infohash:" + t.infoHash.HexString()
542 }
543
544 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
545         p := &t.pieces[index]
546         ret.Priority = t.piecePriority(index)
547         ret.Completion = p.completion()
548         ret.QueuedForHash = p.queuedForHash()
549         ret.Hashing = p.hashing
550         ret.Checking = ret.QueuedForHash || ret.Hashing
551         ret.Marking = p.marking
552         if !ret.Complete && t.piecePartiallyDownloaded(index) {
553                 ret.Partial = true
554         }
555         return
556 }
557
558 func (t *Torrent) metadataPieceSize(piece int) int {
559         return metadataPieceSize(len(t.metadataBytes), piece)
560 }
561
562 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
563         return pp.Message{
564                 Type:       pp.Extended,
565                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
566                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
567                         Piece:     piece,
568                         TotalSize: len(t.metadataBytes),
569                         Type:      msgType,
570                 }), data...),
571         }
572 }
573
574 type pieceAvailabilityRun struct {
575         count        pieceIndex
576         availability int64
577 }
578
579 func (me pieceAvailabilityRun) String() string {
580         return fmt.Sprintf("%v(%v)", me.count, me.availability)
581 }
582
583 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
584         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
585                 ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
586         })
587         for i := range t.pieces {
588                 rle.Append(t.pieces[i].availability, 1)
589         }
590         rle.Flush()
591         return
592 }
593
594 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
595         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
596                 ret = append(ret, PieceStateRun{
597                         PieceState: el.(PieceState),
598                         Length:     int(count),
599                 })
600         })
601         for index := range t.pieces {
602                 rle.Append(t.pieceState(pieceIndex(index)), 1)
603         }
604         rle.Flush()
605         return
606 }
607
608 // Produces a small string representing a PieceStateRun.
609 func (psr PieceStateRun) String() (ret string) {
610         ret = fmt.Sprintf("%d", psr.Length)
611         ret += func() string {
612                 switch psr.Priority {
613                 case PiecePriorityNext:
614                         return "N"
615                 case PiecePriorityNormal:
616                         return "."
617                 case PiecePriorityReadahead:
618                         return "R"
619                 case PiecePriorityNow:
620                         return "!"
621                 case PiecePriorityHigh:
622                         return "H"
623                 default:
624                         return ""
625                 }
626         }()
627         if psr.Hashing {
628                 ret += "H"
629         }
630         if psr.QueuedForHash {
631                 ret += "Q"
632         }
633         if psr.Marking {
634                 ret += "M"
635         }
636         if psr.Partial {
637                 ret += "P"
638         }
639         if psr.Complete {
640                 ret += "C"
641         }
642         if !psr.Ok {
643                 ret += "?"
644         }
645         return
646 }
647
648 func (t *Torrent) writeStatus(w io.Writer) {
649         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
650         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
651         if !t.haveInfo() {
652                 fmt.Fprintf(w, "Metadata have: ")
653                 for _, h := range t.metadataCompletedChunks {
654                         fmt.Fprintf(w, "%c", func() rune {
655                                 if h {
656                                         return 'H'
657                                 } else {
658                                         return '.'
659                                 }
660                         }())
661                 }
662                 fmt.Fprintln(w)
663         }
664         fmt.Fprintf(w, "Piece length: %s\n",
665                 func() string {
666                         if t.haveInfo() {
667                                 return fmt.Sprintf("%v (%v chunks)",
668                                         t.usualPieceSize(),
669                                         float64(t.usualPieceSize())/float64(t.chunkSize))
670                         } else {
671                                 return "no info"
672                         }
673                 }(),
674         )
675         if t.info != nil {
676                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
677                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
678                 fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
679                         for _, run := range t.pieceAvailabilityRuns() {
680                                 ret = append(ret, run.String())
681                         }
682                         return
683                 }(), " "))
684         }
685         fmt.Fprintf(w, "Reader Pieces:")
686         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
687                 fmt.Fprintf(w, " %d:%d", begin, end)
688                 return true
689         })
690         fmt.Fprintln(w)
691
692         fmt.Fprintf(w, "Enabled trackers:\n")
693         func() {
694                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
695                 fmt.Fprintf(tw, "    URL\tExtra\n")
696                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
697                         lu := l.URL()
698                         ru := r.URL()
699                         var luns, runs url.URL = *lu, *ru
700                         luns.Scheme = ""
701                         runs.Scheme = ""
702                         var ml missinggo.MultiLess
703                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
704                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
705                         return ml.Less()
706                 }).([]torrentTrackerAnnouncer) {
707                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
708                 }
709                 tw.Flush()
710         }()
711
712         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
713
714         spew.NewDefaultConfig()
715         spew.Fdump(w, t.statsLocked())
716
717         peers := t.peersAsSlice()
718         sort.Slice(peers, func(_i, _j int) bool {
719                 i := peers[_i]
720                 j := peers[_j]
721                 if less, ok := multiless.New().EagerSameLess(
722                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
723                 ).LessOk(); ok {
724                         return less
725                 }
726                 return worseConn(i, j)
727         })
728         for i, c := range peers {
729                 fmt.Fprintf(w, "%2d. ", i+1)
730                 c.writeStatus(w, t)
731         }
732 }
733
734 func (t *Torrent) haveInfo() bool {
735         return t.info != nil
736 }
737
738 // Returns a run-time generated MetaInfo that includes the info bytes and
739 // announce-list as currently known to the client.
740 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
741         return metainfo.MetaInfo{
742                 CreationDate: time.Now().Unix(),
743                 Comment:      "dynamic metainfo from client",
744                 CreatedBy:    "go.torrent",
745                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
746                 InfoBytes: func() []byte {
747                         if t.haveInfo() {
748                                 return t.metadataBytes
749                         } else {
750                                 return nil
751                         }
752                 }(),
753                 UrlList: func() []string {
754                         ret := make([]string, 0, len(t.webSeeds))
755                         for url := range t.webSeeds {
756                                 ret = append(ret, url)
757                         }
758                         return ret
759                 }(),
760         }
761 }
762
763 // Get bytes left
764 func (t *Torrent) BytesMissing() (n int64) {
765         t.cl.rLock()
766         n = t.bytesMissingLocked()
767         t.cl.rUnlock()
768         return
769 }
770
771 func (t *Torrent) bytesMissingLocked() int64 {
772         return t.bytesLeft()
773 }
774
775 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
776         roaring.Flip(b, 0, end).Iterate(cb)
777 }
778
779 func (t *Torrent) bytesLeft() (left int64) {
780         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
781                 p := t.piece(pieceIndex(x))
782                 left += int64(p.length() - p.numDirtyBytes())
783                 return true
784         })
785         return
786 }
787
788 // Bytes left to give in tracker announces.
789 func (t *Torrent) bytesLeftAnnounce() int64 {
790         if t.haveInfo() {
791                 return t.bytesLeft()
792         } else {
793                 return -1
794         }
795 }
796
797 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
798         if t.pieceComplete(piece) {
799                 return false
800         }
801         if t.pieceAllDirty(piece) {
802                 return false
803         }
804         return t.pieces[piece].hasDirtyChunks()
805 }
806
807 func (t *Torrent) usualPieceSize() int {
808         return int(t.info.PieceLength)
809 }
810
811 func (t *Torrent) numPieces() pieceIndex {
812         return pieceIndex(t.info.NumPieces())
813 }
814
815 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
816         return pieceIndex(t._completedPieces.GetCardinality())
817 }
818
819 func (t *Torrent) deletePieceRequestOrder() {
820         for i := 0; i < t.numPieces(); i++ {
821                 t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i))
822         }
823 }
824
825 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
826         t.closed.Set()
827         if t.storage != nil {
828                 wg.Add(1)
829                 go func() {
830                         defer wg.Done()
831                         t.storageLock.Lock()
832                         defer t.storageLock.Unlock()
833                         if f := t.storage.Close; f != nil {
834                                 err1 := f()
835                                 if err1 != nil {
836                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
837                                 }
838                         }
839                 }()
840         }
841         t.iterPeers(func(p *Peer) {
842                 p.close()
843         })
844         if t.storage != nil {
845                 t.deletePieceRequestOrder()
846         }
847         t.pex.Reset()
848         t.cl.event.Broadcast()
849         t.pieceStateChanges.Close()
850         t.updateWantPeersEvent()
851         return
852 }
853
854 func (t *Torrent) requestOffset(r Request) int64 {
855         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
856 }
857
858 // Return the request that would include the given offset into the torrent data. Returns !ok if
859 // there is no such request.
860 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
861         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
862 }
863
864 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
865         defer perf.ScopeTimerErr(&err)()
866         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
867         if err == nil && n != len(data) {
868                 err = io.ErrShortWrite
869         }
870         return err
871 }
872
873 func (t *Torrent) bitfield() (bf []bool) {
874         bf = make([]bool, t.numPieces())
875         t._completedPieces.Iterate(func(piece uint32) (again bool) {
876                 bf[piece] = true
877                 return true
878         })
879         return
880 }
881
882 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
883         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
884 }
885
886 func (t *Torrent) chunksPerRegularPiece() uint32 {
887         return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
888 }
889
890 func (t *Torrent) numRequests() RequestIndex {
891         if t.numPieces() == 0 {
892                 return 0
893         }
894         return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
895 }
896
897 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
898         t.dirtyChunks.RemoveRange(
899                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
900                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
901 }
902
903 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
904         if t.info.PieceLength == 0 {
905                 // There will be no variance amongst pieces. Only pain.
906                 return 0
907         }
908         if piece == t.numPieces()-1 {
909                 ret := pp.Integer(*t.length % t.info.PieceLength)
910                 if ret != 0 {
911                         return ret
912                 }
913         }
914         return pp.Integer(t.info.PieceLength)
915 }
916
917 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
918         p := t.piece(piece)
919         p.waitNoPendingWrites()
920         storagePiece := t.pieces[piece].Storage()
921
922         // Does the backend want to do its own hashing?
923         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
924                 var sum metainfo.Hash
925                 // log.Printf("A piece decided to self-hash: %d", piece)
926                 sum, err = i.SelfHash()
927                 missinggo.CopyExact(&ret, sum)
928                 return
929         }
930
931         hash := pieceHash.New()
932         const logPieceContents = false
933         if logPieceContents {
934                 var examineBuf bytes.Buffer
935                 _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
936                 log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
937         } else {
938                 _, err = storagePiece.WriteTo(hash)
939         }
940         missinggo.CopyExact(&ret, hash.Sum(nil))
941         return
942 }
943
944 func (t *Torrent) haveAnyPieces() bool {
945         return !t._completedPieces.IsEmpty()
946 }
947
948 func (t *Torrent) haveAllPieces() bool {
949         if !t.haveInfo() {
950                 return false
951         }
952         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
953 }
954
955 func (t *Torrent) havePiece(index pieceIndex) bool {
956         return t.haveInfo() && t.pieceComplete(index)
957 }
958
959 func (t *Torrent) maybeDropMutuallyCompletePeer(
960         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
961         // okay?
962         p *Peer,
963 ) {
964         if !t.cl.config.DropMutuallyCompletePeers {
965                 return
966         }
967         if !t.haveAllPieces() {
968                 return
969         }
970         if all, known := p.peerHasAllPieces(); !(known && all) {
971                 return
972         }
973         if p.useful() {
974                 return
975         }
976         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
977         p.drop()
978 }
979
980 func (t *Torrent) haveChunk(r Request) (ret bool) {
981         // defer func() {
982         //      log.Println("have chunk", r, ret)
983         // }()
984         if !t.haveInfo() {
985                 return false
986         }
987         if t.pieceComplete(pieceIndex(r.Index)) {
988                 return true
989         }
990         p := &t.pieces[r.Index]
991         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
992 }
993
994 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
995         return chunkIndexType(cs.Begin / chunkSize)
996 }
997
998 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
999         return t._pendingPieces.Contains(uint32(index))
1000 }
1001
1002 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
1003 // conns (which is a map).
1004 var peerConnSlices sync.Pool
1005
1006 func getPeerConnSlice(cap int) []*PeerConn {
1007         getInterface := peerConnSlices.Get()
1008         if getInterface == nil {
1009                 return make([]*PeerConn, 0, cap)
1010         } else {
1011                 return getInterface.([]*PeerConn)[:0]
1012         }
1013 }
1014
1015 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
1016 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
1017 // established connections for more than a minute. This is O(n log n). If there was a way to not
1018 // consider the position of a conn relative to the total number, it could be reduced to O(n).
1019 func (t *Torrent) worstBadConn() (ret *PeerConn) {
1020         wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
1021         defer peerConnSlices.Put(wcs.conns)
1022         wcs.initKeys()
1023         heap.Init(&wcs)
1024         for wcs.Len() != 0 {
1025                 c := heap.Pop(&wcs).(*PeerConn)
1026                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1027                         return c
1028                 }
1029                 // If the connection is in the worst half of the established
1030                 // connection quota and is older than a minute.
1031                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1032                         // Give connections 1 minute to prove themselves.
1033                         if time.Since(c.completedHandshake) > time.Minute {
1034                                 return c
1035                         }
1036                 }
1037         }
1038         return nil
1039 }
1040
1041 type PieceStateChange struct {
1042         Index int
1043         PieceState
1044 }
1045
1046 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1047         t.cl._mu.Defer(func() {
1048                 cur := t.pieceState(piece)
1049                 p := &t.pieces[piece]
1050                 if cur != p.publicPieceState {
1051                         p.publicPieceState = cur
1052                         t.pieceStateChanges.Publish(PieceStateChange{
1053                                 int(piece),
1054                                 cur,
1055                         })
1056                 }
1057         })
1058 }
1059
1060 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1061         if t.pieceComplete(piece) {
1062                 return 0
1063         }
1064         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1065 }
1066
1067 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1068         return t.pieces[piece].allChunksDirty()
1069 }
1070
1071 func (t *Torrent) readersChanged() {
1072         t.updateReaderPieces()
1073         t.updateAllPiecePriorities("Torrent.readersChanged")
1074 }
1075
1076 func (t *Torrent) updateReaderPieces() {
1077         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1078 }
1079
1080 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1081         if from == to {
1082                 return
1083         }
1084         t.updateReaderPieces()
1085         // Order the ranges, high and low.
1086         l, h := from, to
1087         if l.begin > h.begin {
1088                 l, h = h, l
1089         }
1090         if l.end < h.begin {
1091                 // Two distinct ranges.
1092                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1093                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1094         } else {
1095                 // Ranges overlap.
1096                 end := l.end
1097                 if h.end > end {
1098                         end = h.end
1099                 }
1100                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1101         }
1102 }
1103
1104 func (t *Torrent) maybeNewConns() {
1105         // Tickle the accept routine.
1106         t.cl.event.Broadcast()
1107         t.openNewConns()
1108 }
1109
1110 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1111         if t._pendingPieces.Contains(uint32(piece)) {
1112                 t.iterPeers(func(c *Peer) {
1113                         if c.actualRequestState.Interested {
1114                                 return
1115                         }
1116                         if !c.isLowOnRequests() {
1117                                 return
1118                         }
1119                         if !c.peerHasPiece(piece) {
1120                                 return
1121                         }
1122                         if c.peerChoking && !c.peerAllowedFast.Contains(uint32(piece)) {
1123                                 return
1124                         }
1125                         c.updateRequests(reason)
1126                 })
1127         }
1128         t.maybeNewConns()
1129         t.publishPieceChange(piece)
1130 }
1131
1132 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1133         if !t.closed.IsSet() {
1134                 // It would be possible to filter on pure-priority changes here to avoid churning the piece
1135                 // request order.
1136                 t.updatePieceRequestOrder(piece)
1137         }
1138         p := &t.pieces[piece]
1139         newPrio := p.uncachedPriority()
1140         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1141         if newPrio == PiecePriorityNone {
1142                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1143                         return
1144                 }
1145         } else {
1146                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1147                         return
1148                 }
1149         }
1150         t.piecePriorityChanged(piece, reason)
1151 }
1152
1153 func (t *Torrent) updateAllPiecePriorities(reason string) {
1154         t.updatePiecePriorities(0, t.numPieces(), reason)
1155 }
1156
1157 // Update all piece priorities in one hit. This function should have the same
1158 // output as updatePiecePriority, but across all pieces.
1159 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1160         for i := begin; i < end; i++ {
1161                 t.updatePiecePriority(i, reason)
1162         }
1163 }
1164
1165 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1166 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1167         if off >= *t.length {
1168                 return
1169         }
1170         if off < 0 {
1171                 size += off
1172                 off = 0
1173         }
1174         if size <= 0 {
1175                 return
1176         }
1177         begin = pieceIndex(off / t.info.PieceLength)
1178         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1179         if end > pieceIndex(t.info.NumPieces()) {
1180                 end = pieceIndex(t.info.NumPieces())
1181         }
1182         return
1183 }
1184
1185 // Returns true if all iterations complete without breaking. Returns the read regions for all
1186 // readers. The reader regions should not be merged as some callers depend on this method to
1187 // enumerate readers.
1188 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1189         for r := range t.readers {
1190                 p := r.pieces
1191                 if p.begin >= p.end {
1192                         continue
1193                 }
1194                 if !f(p.begin, p.end) {
1195                         return false
1196                 }
1197         }
1198         return true
1199 }
1200
1201 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1202         return t.piece(piece).uncachedPriority()
1203 }
1204
1205 func (t *Torrent) pendRequest(req RequestIndex) {
1206         t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
1207 }
1208
1209 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1210         t.cl.event.Broadcast()
1211         if t.pieceComplete(piece) {
1212                 t.onPieceCompleted(piece)
1213         } else {
1214                 t.onIncompletePiece(piece)
1215         }
1216         t.updatePiecePriority(piece, reason)
1217 }
1218
1219 func (t *Torrent) numReceivedConns() (ret int) {
1220         for c := range t.conns {
1221                 if c.Discovery == PeerSourceIncoming {
1222                         ret++
1223                 }
1224         }
1225         return
1226 }
1227
1228 func (t *Torrent) maxHalfOpen() int {
1229         // Note that if we somehow exceed the maximum established conns, we want
1230         // the negative value to have an effect.
1231         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1232         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1233         // We want to allow some experimentation with new peers, and to try to
1234         // upset an oversupply of received connections.
1235         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1236 }
1237
1238 func (t *Torrent) openNewConns() (initiated int) {
1239         defer t.updateWantPeersEvent()
1240         for t.peers.Len() != 0 {
1241                 if !t.wantConns() {
1242                         return
1243                 }
1244                 if len(t.halfOpen) >= t.maxHalfOpen() {
1245                         return
1246                 }
1247                 if len(t.cl.dialers) == 0 {
1248                         return
1249                 }
1250                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1251                         return
1252                 }
1253                 p := t.peers.PopMax()
1254                 t.initiateConn(p)
1255                 initiated++
1256         }
1257         return
1258 }
1259
1260 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1261         p := t.piece(piece)
1262         uncached := t.pieceCompleteUncached(piece)
1263         cached := p.completion()
1264         changed := cached != uncached
1265         complete := uncached.Complete
1266         p.storageCompletionOk = uncached.Ok
1267         x := uint32(piece)
1268         if complete {
1269                 t._completedPieces.Add(x)
1270                 t.openNewConns()
1271         } else {
1272                 t._completedPieces.Remove(x)
1273         }
1274         p.t.updatePieceRequestOrder(piece)
1275         t.updateComplete()
1276         if complete && len(p.dirtiers) != 0 {
1277                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1278         }
1279         if changed {
1280                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
1281                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1282         }
1283         return changed
1284 }
1285
1286 // Non-blocking read. Client lock is not required.
1287 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1288         for len(b) != 0 {
1289                 p := &t.pieces[off/t.info.PieceLength]
1290                 p.waitNoPendingWrites()
1291                 var n1 int
1292                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1293                 if n1 == 0 {
1294                         break
1295                 }
1296                 off += int64(n1)
1297                 n += n1
1298                 b = b[n1:]
1299         }
1300         return
1301 }
1302
1303 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1304 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1305 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1306 func (t *Torrent) maybeCompleteMetadata() error {
1307         if t.haveInfo() {
1308                 // Nothing to do.
1309                 return nil
1310         }
1311         if !t.haveAllMetadataPieces() {
1312                 // Don't have enough metadata pieces.
1313                 return nil
1314         }
1315         err := t.setInfoBytesLocked(t.metadataBytes)
1316         if err != nil {
1317                 t.invalidateMetadata()
1318                 return fmt.Errorf("error setting info bytes: %s", err)
1319         }
1320         if t.cl.config.Debug {
1321                 t.logger.Printf("%s: got metadata from peers", t)
1322         }
1323         return nil
1324 }
1325
1326 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1327         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1328                 if end > begin {
1329                         now.Add(bitmap.BitIndex(begin))
1330                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1331                 }
1332                 return true
1333         })
1334         return
1335 }
1336
1337 func (t *Torrent) needData() bool {
1338         if t.closed.IsSet() {
1339                 return false
1340         }
1341         if !t.haveInfo() {
1342                 return true
1343         }
1344         return !t._pendingPieces.IsEmpty()
1345 }
1346
1347 func appendMissingStrings(old, new []string) (ret []string) {
1348         ret = old
1349 new:
1350         for _, n := range new {
1351                 for _, o := range old {
1352                         if o == n {
1353                                 continue new
1354                         }
1355                 }
1356                 ret = append(ret, n)
1357         }
1358         return
1359 }
1360
1361 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1362         ret = existing
1363         for minNumTiers > len(ret) {
1364                 ret = append(ret, nil)
1365         }
1366         return
1367 }
1368
1369 func (t *Torrent) addTrackers(announceList [][]string) {
1370         fullAnnounceList := &t.metainfo.AnnounceList
1371         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1372         for tierIndex, trackerURLs := range announceList {
1373                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1374         }
1375         t.startMissingTrackerScrapers()
1376         t.updateWantPeersEvent()
1377 }
1378
1379 // Don't call this before the info is available.
1380 func (t *Torrent) bytesCompleted() int64 {
1381         if !t.haveInfo() {
1382                 return 0
1383         }
1384         return *t.length - t.bytesLeft()
1385 }
1386
1387 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1388         t.cl.lock()
1389         defer t.cl.unlock()
1390         return t.setInfoBytesLocked(b)
1391 }
1392
1393 // Returns true if connection is removed from torrent.Conns.
1394 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1395         if !c.closed.IsSet() {
1396                 panic("connection is not closed")
1397                 // There are behaviours prevented by the closed state that will fail
1398                 // if the connection has been deleted.
1399         }
1400         _, ret = t.conns[c]
1401         delete(t.conns, c)
1402         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1403         // the drop event against the PexConnState instead.
1404         if ret {
1405                 if !t.cl.config.DisablePEX {
1406                         t.pex.Drop(c)
1407                 }
1408         }
1409         torrent.Add("deleted connections", 1)
1410         c.deleteAllRequests()
1411         t.assertPendingRequests()
1412         return
1413 }
1414
1415 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1416         if !t.haveInfo() {
1417                 return
1418         }
1419         p.newPeerPieces().Iterate(func(i uint32) bool {
1420                 p.t.decPieceAvailability(pieceIndex(i))
1421                 return true
1422         })
1423 }
1424
1425 func (t *Torrent) assertPendingRequests() {
1426         if !check {
1427                 return
1428         }
1429         // var actual pendingRequests
1430         // if t.haveInfo() {
1431         //      actual.m = make([]int, t.numRequests())
1432         // }
1433         // t.iterPeers(func(p *Peer) {
1434         //      p.actualRequestState.Requests.Iterate(func(x uint32) bool {
1435         //              actual.Inc(x)
1436         //              return true
1437         //      })
1438         // })
1439         // diff := cmp.Diff(actual.m, t.pendingRequests.m)
1440         // if diff != "" {
1441         //      panic(diff)
1442         // }
1443 }
1444
1445 func (t *Torrent) dropConnection(c *PeerConn) {
1446         t.cl.event.Broadcast()
1447         c.close()
1448         if t.deletePeerConn(c) {
1449                 t.openNewConns()
1450         }
1451 }
1452
1453 // Peers as in contact information for dialing out.
1454 func (t *Torrent) wantPeers() bool {
1455         if t.closed.IsSet() {
1456                 return false
1457         }
1458         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1459                 return false
1460         }
1461         return t.wantConns()
1462 }
1463
1464 func (t *Torrent) updateWantPeersEvent() {
1465         if t.wantPeers() {
1466                 t.wantPeersEvent.Set()
1467         } else {
1468                 t.wantPeersEvent.Clear()
1469         }
1470 }
1471
1472 // Returns whether the client should make effort to seed the torrent.
1473 func (t *Torrent) seeding() bool {
1474         cl := t.cl
1475         if t.closed.IsSet() {
1476                 return false
1477         }
1478         if t.dataUploadDisallowed {
1479                 return false
1480         }
1481         if cl.config.NoUpload {
1482                 return false
1483         }
1484         if !cl.config.Seed {
1485                 return false
1486         }
1487         if cl.config.DisableAggressiveUpload && t.needData() {
1488                 return false
1489         }
1490         return true
1491 }
1492
1493 func (t *Torrent) onWebRtcConn(
1494         c datachannel.ReadWriteCloser,
1495         dcc webtorrent.DataChannelContext,
1496 ) {
1497         defer c.Close()
1498         pc, err := t.cl.initiateProtocolHandshakes(
1499                 context.Background(),
1500                 webrtcNetConn{c, dcc},
1501                 t,
1502                 dcc.LocalOffered,
1503                 false,
1504                 webrtcNetAddr{dcc.Remote},
1505                 webrtcNetwork,
1506                 fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
1507         )
1508         if err != nil {
1509                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1510                 return
1511         }
1512         if dcc.LocalOffered {
1513                 pc.Discovery = PeerSourceTracker
1514         } else {
1515                 pc.Discovery = PeerSourceIncoming
1516         }
1517         pc.conn.SetWriteDeadline(time.Time{})
1518         t.cl.lock()
1519         defer t.cl.unlock()
1520         err = t.cl.runHandshookConn(pc, t)
1521         if err != nil {
1522                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1523         }
1524 }
1525
1526 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1527         err := t.cl.runHandshookConn(pc, t)
1528         if err != nil || logAll {
1529                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1530         }
1531 }
1532
1533 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1534         t.logRunHandshookConn(pc, false, log.Debug)
1535 }
1536
1537 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1538         wtc, release := t.cl.websocketTrackers.Get(u.String())
1539         go func() {
1540                 <-t.closed.Done()
1541                 release()
1542         }()
1543         wst := websocketTrackerStatus{u, wtc}
1544         go func() {
1545                 err := wtc.Announce(tracker.Started, t.infoHash)
1546                 if err != nil {
1547                         t.logger.WithDefaultLevel(log.Warning).Printf(
1548                                 "error in initial announce to %q: %v",
1549                                 u.String(), err,
1550                         )
1551                 }
1552         }()
1553         return wst
1554 }
1555
1556 func (t *Torrent) startScrapingTracker(_url string) {
1557         if _url == "" {
1558                 return
1559         }
1560         u, err := url.Parse(_url)
1561         if err != nil {
1562                 // URLs with a leading '*' appear to be a uTorrent convention to
1563                 // disable trackers.
1564                 if _url[0] != '*' {
1565                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1566                 }
1567                 return
1568         }
1569         if u.Scheme == "udp" {
1570                 u.Scheme = "udp4"
1571                 t.startScrapingTracker(u.String())
1572                 u.Scheme = "udp6"
1573                 t.startScrapingTracker(u.String())
1574                 return
1575         }
1576         if _, ok := t.trackerAnnouncers[_url]; ok {
1577                 return
1578         }
1579         sl := func() torrentTrackerAnnouncer {
1580                 switch u.Scheme {
1581                 case "ws", "wss":
1582                         if t.cl.config.DisableWebtorrent {
1583                                 return nil
1584                         }
1585                         return t.startWebsocketAnnouncer(*u)
1586                 case "udp4":
1587                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1588                                 return nil
1589                         }
1590                 case "udp6":
1591                         if t.cl.config.DisableIPv6 {
1592                                 return nil
1593                         }
1594                 }
1595                 newAnnouncer := &trackerScraper{
1596                         u:               *u,
1597                         t:               t,
1598                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1599                 }
1600                 go newAnnouncer.Run()
1601                 return newAnnouncer
1602         }()
1603         if sl == nil {
1604                 return
1605         }
1606         if t.trackerAnnouncers == nil {
1607                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1608         }
1609         t.trackerAnnouncers[_url] = sl
1610 }
1611
1612 // Adds and starts tracker scrapers for tracker URLs that aren't already
1613 // running.
1614 func (t *Torrent) startMissingTrackerScrapers() {
1615         if t.cl.config.DisableTrackers {
1616                 return
1617         }
1618         t.startScrapingTracker(t.metainfo.Announce)
1619         for _, tier := range t.metainfo.AnnounceList {
1620                 for _, url := range tier {
1621                         t.startScrapingTracker(url)
1622                 }
1623         }
1624 }
1625
1626 // Returns an AnnounceRequest with fields filled out to defaults and current
1627 // values.
1628 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1629         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1630         // dependent on the network in use.
1631         return tracker.AnnounceRequest{
1632                 Event: event,
1633                 NumWant: func() int32 {
1634                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1635                                 return -1
1636                         } else {
1637                                 return 0
1638                         }
1639                 }(),
1640                 Port:     uint16(t.cl.incomingPeerPort()),
1641                 PeerId:   t.cl.peerID,
1642                 InfoHash: t.infoHash,
1643                 Key:      t.cl.announceKey(),
1644
1645                 // The following are vaguely described in BEP 3.
1646
1647                 Left:     t.bytesLeftAnnounce(),
1648                 Uploaded: t.stats.BytesWrittenData.Int64(),
1649                 // There's no mention of wasted or unwanted download in the BEP.
1650                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1651         }
1652 }
1653
1654 // Adds peers revealed in an announce until the announce ends, or we have
1655 // enough peers.
1656 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1657         cl := t.cl
1658         for v := range pvs {
1659                 cl.lock()
1660                 added := 0
1661                 for _, cp := range v.Peers {
1662                         if cp.Port == 0 {
1663                                 // Can't do anything with this.
1664                                 continue
1665                         }
1666                         if t.addPeer(PeerInfo{
1667                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1668                                 Source: PeerSourceDhtGetPeers,
1669                         }) {
1670                                 added++
1671                         }
1672                 }
1673                 cl.unlock()
1674                 // if added != 0 {
1675                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1676                 // }
1677         }
1678 }
1679
1680 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1681 // announce ends. stop will force the announce to end.
1682 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1683         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1684         if err != nil {
1685                 return
1686         }
1687         _done := make(chan struct{})
1688         done = _done
1689         stop = ps.Close
1690         go func() {
1691                 t.consumeDhtAnnouncePeers(ps.Peers())
1692                 close(_done)
1693         }()
1694         return
1695 }
1696
1697 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1698         _, stop, err := t.AnnounceToDht(s)
1699         if err != nil {
1700                 return err
1701         }
1702         select {
1703         case <-t.closed.Done():
1704         case <-time.After(5 * time.Minute):
1705         }
1706         stop()
1707         return nil
1708 }
1709
1710 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1711         cl := t.cl
1712         cl.lock()
1713         defer cl.unlock()
1714         for {
1715                 for {
1716                         if t.closed.IsSet() {
1717                                 return
1718                         }
1719                         // We're also announcing ourselves as a listener, so we don't just want peer addresses.
1720                         // TODO: We can include the announce_peer step depending on whether we can receive
1721                         // inbound connections. We should probably only announce once every 15 mins too.
1722                         if !t.wantConns() {
1723                                 goto wait
1724                         }
1725                         // TODO: Determine if there's a listener on the port we're announcing.
1726                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1727                                 goto wait
1728                         }
1729                         break
1730                 wait:
1731                         cl.event.Wait()
1732                 }
1733                 func() {
1734                         t.numDHTAnnounces++
1735                         cl.unlock()
1736                         defer cl.lock()
1737                         err := t.timeboxedAnnounceToDht(s)
1738                         if err != nil {
1739                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1740                         }
1741                 }()
1742         }
1743 }
1744
1745 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1746         for _, p := range peers {
1747                 if t.addPeer(p) {
1748                         added++
1749                 }
1750         }
1751         return
1752 }
1753
1754 // The returned TorrentStats may require alignment in memory. See
1755 // https://github.com/anacrolix/torrent/issues/383.
1756 func (t *Torrent) Stats() TorrentStats {
1757         t.cl.rLock()
1758         defer t.cl.rUnlock()
1759         return t.statsLocked()
1760 }
1761
1762 func (t *Torrent) statsLocked() (ret TorrentStats) {
1763         ret.ActivePeers = len(t.conns)
1764         ret.HalfOpenPeers = len(t.halfOpen)
1765         ret.PendingPeers = t.peers.Len()
1766         ret.TotalPeers = t.numTotalPeers()
1767         ret.ConnectedSeeders = 0
1768         for c := range t.conns {
1769                 if all, ok := c.peerHasAllPieces(); all && ok {
1770                         ret.ConnectedSeeders++
1771                 }
1772         }
1773         ret.ConnStats = t.stats.Copy()
1774         ret.PiecesComplete = t.numPiecesCompleted()
1775         return
1776 }
1777
1778 // The total number of peers in the torrent.
1779 func (t *Torrent) numTotalPeers() int {
1780         peers := make(map[string]struct{})
1781         for conn := range t.conns {
1782                 ra := conn.conn.RemoteAddr()
1783                 if ra == nil {
1784                         // It's been closed and doesn't support RemoteAddr.
1785                         continue
1786                 }
1787                 peers[ra.String()] = struct{}{}
1788         }
1789         for addr := range t.halfOpen {
1790                 peers[addr] = struct{}{}
1791         }
1792         t.peers.Each(func(peer PeerInfo) {
1793                 peers[peer.Addr.String()] = struct{}{}
1794         })
1795         return len(peers)
1796 }
1797
1798 // Reconcile bytes transferred before connection was associated with a
1799 // torrent.
1800 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1801         if c._stats != (ConnStats{
1802                 // Handshakes should only increment these fields:
1803                 BytesWritten: c._stats.BytesWritten,
1804                 BytesRead:    c._stats.BytesRead,
1805         }) {
1806                 panic("bad stats")
1807         }
1808         c.postHandshakeStats(func(cs *ConnStats) {
1809                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1810                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1811         })
1812         c.reconciledHandshakeStats = true
1813 }
1814
1815 // Returns true if the connection is added.
1816 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1817         defer func() {
1818                 if err == nil {
1819                         torrent.Add("added connections", 1)
1820                 }
1821         }()
1822         if t.closed.IsSet() {
1823                 return errors.New("torrent closed")
1824         }
1825         for c0 := range t.conns {
1826                 if c.PeerID != c0.PeerID {
1827                         continue
1828                 }
1829                 if !t.cl.config.DropDuplicatePeerIds {
1830                         continue
1831                 }
1832                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1833                         c0.close()
1834                         t.deletePeerConn(c0)
1835                 } else {
1836                         return errors.New("existing connection preferred")
1837                 }
1838         }
1839         if len(t.conns) >= t.maxEstablishedConns {
1840                 c := t.worstBadConn()
1841                 if c == nil {
1842                         return errors.New("don't want conns")
1843                 }
1844                 c.close()
1845                 t.deletePeerConn(c)
1846         }
1847         if len(t.conns) >= t.maxEstablishedConns {
1848                 panic(len(t.conns))
1849         }
1850         t.conns[c] = struct{}{}
1851         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1852                 t.pex.Add(c) // as no further extended handshake expected
1853         }
1854         return nil
1855 }
1856
1857 func (t *Torrent) wantConns() bool {
1858         if !t.networkingEnabled.Bool() {
1859                 return false
1860         }
1861         if t.closed.IsSet() {
1862                 return false
1863         }
1864         if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
1865                 return false
1866         }
1867         return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
1868 }
1869
1870 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1871         t.cl.lock()
1872         defer t.cl.unlock()
1873         oldMax = t.maxEstablishedConns
1874         t.maxEstablishedConns = max
1875         wcs := worseConnSlice{
1876                 conns: t.appendConns(nil, func(*PeerConn) bool {
1877                         return true
1878                 }),
1879         }
1880         wcs.initKeys()
1881         heap.Init(&wcs)
1882         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1883                 t.dropConnection(heap.Pop(&wcs).(*PeerConn))
1884         }
1885         t.openNewConns()
1886         return oldMax
1887 }
1888
1889 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1890         t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
1891         p := t.piece(piece)
1892         p.numVerifies++
1893         t.cl.event.Broadcast()
1894         if t.closed.IsSet() {
1895                 return
1896         }
1897
1898         // Don't score the first time a piece is hashed, it could be an initial check.
1899         if p.storageCompletionOk {
1900                 if passed {
1901                         pieceHashedCorrect.Add(1)
1902                 } else {
1903                         log.Fmsg(
1904                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1905                         ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
1906                         pieceHashedNotCorrect.Add(1)
1907                 }
1908         }
1909
1910         p.marking = true
1911         t.publishPieceChange(piece)
1912         defer func() {
1913                 p.marking = false
1914                 t.publishPieceChange(piece)
1915         }()
1916
1917         if passed {
1918                 if len(p.dirtiers) != 0 {
1919                         // Don't increment stats above connection-level for every involved connection.
1920                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1921                 }
1922                 for c := range p.dirtiers {
1923                         c._stats.incrementPiecesDirtiedGood()
1924                 }
1925                 t.clearPieceTouchers(piece)
1926                 t.cl.unlock()
1927                 err := p.Storage().MarkComplete()
1928                 if err != nil {
1929                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1930                 }
1931                 t.cl.lock()
1932
1933                 if t.closed.IsSet() {
1934                         return
1935                 }
1936                 t.pendAllChunkSpecs(piece)
1937         } else {
1938                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
1939                         // Peers contributed to all the data for this piece hash failure, and the failure was
1940                         // not due to errors in the storage (such as data being dropped in a cache).
1941
1942                         // Increment Torrent and above stats, and then specific connections.
1943                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1944                         for c := range p.dirtiers {
1945                                 // Y u do dis peer?!
1946                                 c.stats().incrementPiecesDirtiedBad()
1947                         }
1948
1949                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
1950                         for c := range p.dirtiers {
1951                                 if !c.trusted {
1952                                         bannableTouchers = append(bannableTouchers, c)
1953                                 }
1954                         }
1955                         t.clearPieceTouchers(piece)
1956                         slices.Sort(bannableTouchers, connLessTrusted)
1957
1958                         if t.cl.config.Debug {
1959                                 t.logger.Printf(
1960                                         "bannable conns by trust for piece %d: %v",
1961                                         piece,
1962                                         func() (ret []connectionTrust) {
1963                                                 for _, c := range bannableTouchers {
1964                                                         ret = append(ret, c.trust())
1965                                                 }
1966                                                 return
1967                                         }(),
1968                                 )
1969                         }
1970
1971                         if len(bannableTouchers) >= 1 {
1972                                 c := bannableTouchers[0]
1973                                 t.cl.banPeerIP(c.remoteIp())
1974                                 c.drop()
1975                         }
1976                 }
1977                 t.onIncompletePiece(piece)
1978                 p.Storage().MarkNotComplete()
1979         }
1980         t.updatePieceCompletion(piece)
1981 }
1982
1983 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1984         // TODO: Make faster
1985         for cn := range t.conns {
1986                 cn.tickleWriter()
1987         }
1988 }
1989
1990 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1991         t.pendAllChunkSpecs(piece)
1992         t.cancelRequestsForPiece(piece)
1993         t.piece(piece).readerCond.Broadcast()
1994         for conn := range t.conns {
1995                 conn.have(piece)
1996                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
1997         }
1998 }
1999
2000 // Called when a piece is found to be not complete.
2001 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
2002         if t.pieceAllDirty(piece) {
2003                 t.pendAllChunkSpecs(piece)
2004         }
2005         if !t.wantPieceIndex(piece) {
2006                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
2007                 return
2008         }
2009         // We could drop any connections that we told we have a piece that we
2010         // don't here. But there's a test failure, and it seems clients don't care
2011         // if you request pieces that you already claim to have. Pruning bad
2012         // connections might just remove any connections that aren't treating us
2013         // favourably anyway.
2014
2015         // for c := range t.conns {
2016         //      if c.sentHave(piece) {
2017         //              c.drop()
2018         //      }
2019         // }
2020         t.iterPeers(func(conn *Peer) {
2021                 if conn.peerHasPiece(piece) {
2022                         conn.updateRequests("piece incomplete")
2023                 }
2024         })
2025 }
2026
2027 func (t *Torrent) tryCreateMorePieceHashers() {
2028         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2029         }
2030 }
2031
2032 func (t *Torrent) tryCreatePieceHasher() bool {
2033         if t.storage == nil {
2034                 return false
2035         }
2036         pi, ok := t.getPieceToHash()
2037         if !ok {
2038                 return false
2039         }
2040         p := t.piece(pi)
2041         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2042         p.hashing = true
2043         t.publishPieceChange(pi)
2044         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
2045         t.storageLock.RLock()
2046         t.activePieceHashes++
2047         go t.pieceHasher(pi)
2048         return true
2049 }
2050
2051 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2052         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2053                 if t.piece(i).hashing {
2054                         return true
2055                 }
2056                 ret = i
2057                 ok = true
2058                 return false
2059         })
2060         return
2061 }
2062
2063 func (t *Torrent) pieceHasher(index pieceIndex) {
2064         p := t.piece(index)
2065         sum, copyErr := t.hashPiece(index)
2066         correct := sum == *p.hash
2067         switch copyErr {
2068         case nil, io.EOF:
2069         default:
2070                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2071         }
2072         t.storageLock.RUnlock()
2073         t.cl.lock()
2074         defer t.cl.unlock()
2075         p.hashing = false
2076         t.pieceHashed(index, correct, copyErr)
2077         t.updatePiecePriority(index, "Torrent.pieceHasher")
2078         t.activePieceHashes--
2079         t.tryCreateMorePieceHashers()
2080 }
2081
2082 // Return the connections that touched a piece, and clear the entries while doing it.
2083 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2084         p := t.piece(pi)
2085         for c := range p.dirtiers {
2086                 delete(c.peerTouchedPieces, pi)
2087                 delete(p.dirtiers, c)
2088         }
2089 }
2090
2091 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2092         t.iterPeers(func(p *Peer) {
2093                 ret = append(ret, p)
2094         })
2095         return
2096 }
2097
2098 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2099         piece := t.piece(pieceIndex)
2100         if piece.queuedForHash() {
2101                 return
2102         }
2103         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2104         t.publishPieceChange(pieceIndex)
2105         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2106         t.tryCreateMorePieceHashers()
2107 }
2108
2109 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2110 // before the Info is available.
2111 func (t *Torrent) VerifyData() {
2112         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2113                 t.Piece(i).VerifyData()
2114         }
2115 }
2116
2117 // Start the process of connecting to the given peer for the given torrent if appropriate.
2118 func (t *Torrent) initiateConn(peer PeerInfo) {
2119         if peer.Id == t.cl.peerID {
2120                 return
2121         }
2122         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2123                 return
2124         }
2125         addr := peer.Addr
2126         if t.addrActive(addr.String()) {
2127                 return
2128         }
2129         t.cl.numHalfOpen++
2130         t.halfOpen[addr.String()] = peer
2131         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2132 }
2133
2134 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2135 // quickly make one Client visible to the Torrent of another Client.
2136 func (t *Torrent) AddClientPeer(cl *Client) int {
2137         return t.AddPeers(func() (ps []PeerInfo) {
2138                 for _, la := range cl.ListenAddrs() {
2139                         ps = append(ps, PeerInfo{
2140                                 Addr:    la,
2141                                 Trusted: true,
2142                         })
2143                 }
2144                 return
2145         }())
2146 }
2147
2148 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2149 // connection.
2150 func (t *Torrent) allStats(f func(*ConnStats)) {
2151         f(&t.stats)
2152         f(&t.cl.stats)
2153 }
2154
2155 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2156         return t.pieces[i].hashing
2157 }
2158
2159 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2160         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2161 }
2162
2163 func (t *Torrent) dialTimeout() time.Duration {
2164         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2165 }
2166
2167 func (t *Torrent) piece(i int) *Piece {
2168         return &t.pieces[i]
2169 }
2170
2171 func (t *Torrent) onWriteChunkErr(err error) {
2172         if t.userOnWriteChunkErr != nil {
2173                 go t.userOnWriteChunkErr(err)
2174                 return
2175         }
2176         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2177         t.disallowDataDownloadLocked()
2178 }
2179
2180 func (t *Torrent) DisallowDataDownload() {
2181         t.disallowDataDownloadLocked()
2182 }
2183
2184 func (t *Torrent) disallowDataDownloadLocked() {
2185         t.dataDownloadDisallowed.Set()
2186 }
2187
2188 func (t *Torrent) AllowDataDownload() {
2189         t.dataDownloadDisallowed.Clear()
2190 }
2191
2192 // Enables uploading data, if it was disabled.
2193 func (t *Torrent) AllowDataUpload() {
2194         t.cl.lock()
2195         defer t.cl.unlock()
2196         t.dataUploadDisallowed = false
2197         for c := range t.conns {
2198                 c.updateRequests("allow data upload")
2199         }
2200 }
2201
2202 // Disables uploading data, if it was enabled.
2203 func (t *Torrent) DisallowDataUpload() {
2204         t.cl.lock()
2205         defer t.cl.unlock()
2206         t.dataUploadDisallowed = true
2207         for c := range t.conns {
2208                 c.updateRequests("disallow data upload")
2209         }
2210 }
2211
2212 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2213 // or if nil, a critical message is logged, and data download is disabled.
2214 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2215         t.cl.lock()
2216         defer t.cl.unlock()
2217         t.userOnWriteChunkErr = f
2218 }
2219
2220 func (t *Torrent) iterPeers(f func(p *Peer)) {
2221         for pc := range t.conns {
2222                 f(&pc.Peer)
2223         }
2224         for _, ws := range t.webSeeds {
2225                 f(ws)
2226         }
2227 }
2228
2229 func (t *Torrent) callbacks() *Callbacks {
2230         return &t.cl.config.Callbacks
2231 }
2232
2233 func (t *Torrent) addWebSeed(url string) {
2234         if t.cl.config.DisableWebseeds {
2235                 return
2236         }
2237         if _, ok := t.webSeeds[url]; ok {
2238                 return
2239         }
2240         // I don't think Go http supports pipelining requests. However, we can have more ready to go
2241         // right away. This value should be some multiple of the number of connections to a host. I
2242         // would expect that double maxRequests plus a bit would be appropriate. This value is based on
2243         // downloading Sintel (08ada5a7a6183aae1e09d831df6748d566095a10) from
2244         // "https://webtorrent.io/torrents/".
2245         const maxRequests = 16
2246         ws := webseedPeer{
2247                 peer: Peer{
2248                         t:                        t,
2249                         outgoing:                 true,
2250                         Network:                  "http",
2251                         reconciledHandshakeStats: true,
2252                         // This should affect how often we have to recompute requests for this peer. Note that
2253                         // because we can request more than 1 thing at a time over HTTP, we will hit the low
2254                         // requests mark more often, so recomputation is probably sooner than with regular peer
2255                         // conns. ~4x maxRequests would be about right.
2256                         PeerMaxRequests: 128,
2257                         RemoteAddr:      remoteAddrFromUrl(url),
2258                         callbacks:       t.callbacks(),
2259                 },
2260                 client: webseed.Client{
2261                         HttpClient: t.cl.webseedHttpClient,
2262                         Url:        url,
2263                 },
2264                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2265                 maxRequests:    maxRequests,
2266         }
2267         ws.peer.initUpdateRequestsTimer()
2268         ws.requesterCond.L = t.cl.locker()
2269         for i := 0; i < maxRequests; i += 1 {
2270                 go ws.requester(i)
2271         }
2272         for _, f := range t.callbacks().NewPeer {
2273                 f(&ws.peer)
2274         }
2275         ws.peer.logger = t.logger.WithContextValue(&ws)
2276         ws.peer.peerImpl = &ws
2277         if t.haveInfo() {
2278                 ws.onGotInfo(t.info)
2279         }
2280         t.webSeeds[url] = &ws.peer
2281 }
2282
2283 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2284         t.iterPeers(func(p1 *Peer) {
2285                 if p1 == p {
2286                         active = true
2287                 }
2288         })
2289         return
2290 }
2291
2292 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2293         index := ri / t.chunksPerRegularPiece()
2294         return Request{
2295                 pp.Integer(index),
2296                 t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2297         }
2298 }
2299
2300 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2301         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
2302 }
2303
2304 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2305         return RequestIndex(piece) * t.chunksPerRegularPiece()
2306 }
2307
2308 func (t *Torrent) updateComplete() {
2309         t.Complete.SetBool(t.haveAllPieces())
2310 }