]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Remove unused Torrent.numActivePeers
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "net/url"
13         "sort"
14         "strings"
15         "text/tabwriter"
16         "time"
17         "unsafe"
18
19         "github.com/RoaringBitmap/roaring"
20         "github.com/anacrolix/chansync"
21         "github.com/anacrolix/chansync/events"
22         "github.com/anacrolix/dht/v2"
23         "github.com/anacrolix/log"
24         "github.com/anacrolix/missinggo/perf"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/bitmap"
29         "github.com/anacrolix/multiless"
30         "github.com/anacrolix/sync"
31         "github.com/davecgh/go-spew/spew"
32         "github.com/google/go-cmp/cmp"
33         "github.com/pion/datachannel"
34
35         "github.com/anacrolix/torrent/bencode"
36         "github.com/anacrolix/torrent/common"
37         "github.com/anacrolix/torrent/metainfo"
38         pp "github.com/anacrolix/torrent/peer_protocol"
39         "github.com/anacrolix/torrent/segments"
40         "github.com/anacrolix/torrent/storage"
41         "github.com/anacrolix/torrent/tracker"
42         "github.com/anacrolix/torrent/webseed"
43         "github.com/anacrolix/torrent/webtorrent"
44 )
45
46 // Maintains state of torrent within a Client. Many methods should not be called before the info is
47 // available, see .Info and .GotInfo.
48 type Torrent struct {
49         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
50         // alignment. See #262.
51         stats  ConnStats
52         cl     *Client
53         logger log.Logger
54
55         networkingEnabled      chansync.Flag
56         dataDownloadDisallowed chansync.Flag
57         dataUploadDisallowed   bool
58         userOnWriteChunkErr    func(error)
59
60         closed   chansync.SetOnce
61         infoHash metainfo.Hash
62         pieces   []Piece
63         // Values are the piece indices that changed.
64         pieceStateChanges *pubsub.PubSub
65         // The size of chunks to request from peers over the wire. This is
66         // normally 16KiB by convention these days.
67         chunkSize pp.Integer
68         chunkPool sync.Pool
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length *int64
72
73         // The storage to open when the info dict becomes available.
74         storageOpener *storage.Client
75         // Storage for torrent data.
76         storage *storage.Torrent
77         // Read-locked for using storage, and write-locked for Closing.
78         storageLock sync.RWMutex
79
80         // TODO: Only announce stuff is used?
81         metainfo metainfo.MetaInfo
82
83         // The info dict. nil if we don't have it (yet).
84         info      *metainfo.Info
85         fileIndex segments.Index
86         files     *[]*File
87
88         webSeeds map[string]*Peer
89
90         // Active peer connections, running message stream loops. TODO: Make this
91         // open (not-closed) connections only.
92         conns               map[*PeerConn]struct{}
93         maxEstablishedConns int
94         // Set of addrs to which we're attempting to connect. Connections are
95         // half-open until all handshakes are completed.
96         halfOpen map[string]PeerInfo
97
98         // Reserve of peers to connect to. A peer can be both here and in the
99         // active connections if were told about the peer after connecting with
100         // them. That encourages us to reconnect to peers that are well known in
101         // the swarm.
102         peers prioritizedPeers
103         // Whether we want to know to know more peers.
104         wantPeersEvent missinggo.Event
105         // An announcer for each tracker URL.
106         trackerAnnouncers map[string]torrentTrackerAnnouncer
107         // How many times we've initiated a DHT announce. TODO: Move into stats.
108         numDHTAnnounces int
109
110         // Name used if the info name isn't available. Should be cleared when the
111         // Info does become available.
112         nameMu      sync.RWMutex
113         displayName string
114
115         // The bencoded bytes of the info dict. This is actively manipulated if
116         // the info bytes aren't initially available, and we try to fetch them
117         // from peers.
118         metadataBytes []byte
119         // Each element corresponds to the 16KiB metadata pieces. If true, we have
120         // received that piece.
121         metadataCompletedChunks []bool
122         metadataChanged         sync.Cond
123
124         // Closed when .Info is obtained.
125         gotMetainfoC chan struct{}
126
127         readers                map[*reader]struct{}
128         _readerNowPieces       bitmap.Bitmap
129         _readerReadaheadPieces bitmap.Bitmap
130
131         // A cache of pieces we need to get. Calculated from various piece and
132         // file priorities and completion states elsewhere.
133         _pendingPieces roaring.Bitmap
134         // A cache of completed piece indices.
135         _completedPieces roaring.Bitmap
136         // Pieces that need to be hashed.
137         piecesQueuedForHash       bitmap.Bitmap
138         activePieceHashes         int
139         initialPieceCheckDisabled bool
140
141         // Count of each request across active connections.
142         pendingRequests pendingRequests
143         // Chunks we've written to since the corresponding piece was last checked.
144         dirtyChunks roaring.Bitmap
145
146         pex pexState
147
148         // Is On when all pieces are complete.
149         Complete chansync.Flag
150 }
151
152 func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
153         t.iterPeers(func(peer *Peer) {
154                 if peer.peerHasPiece(i) {
155                         count++
156                 }
157         })
158         return
159 }
160
161 func (t *Torrent) decPieceAvailability(i pieceIndex) {
162         if !t.haveInfo() {
163                 return
164         }
165         p := t.piece(i)
166         if p.availability <= 0 {
167                 panic(p.availability)
168         }
169         p.availability--
170 }
171
172 func (t *Torrent) incPieceAvailability(i pieceIndex) {
173         // If we don't the info, this should be reconciled when we do.
174         if t.haveInfo() {
175                 p := t.piece(i)
176                 p.availability++
177         }
178 }
179
180 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
181         return t._readerNowPieces
182 }
183
184 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
185         return t._readerReadaheadPieces
186 }
187
188 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
189         return !t.wantPieceIndex(i)
190 }
191
192 // Returns a channel that is closed when the Torrent is closed.
193 func (t *Torrent) Closed() events.Done {
194         return t.closed.Done()
195 }
196
197 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
198 // pending, and half-open peers.
199 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
200         // Add pending peers to the list
201         t.peers.Each(func(peer PeerInfo) {
202                 ks = append(ks, peer)
203         })
204
205         // Add half-open peers to the list
206         for _, peer := range t.halfOpen {
207                 ks = append(ks, peer)
208         }
209
210         // Add active peers to the list
211         for conn := range t.conns {
212
213                 ks = append(ks, PeerInfo{
214                         Id:     conn.PeerID,
215                         Addr:   conn.RemoteAddr,
216                         Source: conn.Discovery,
217                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
218                         // > But if we're not connected to them with an encrypted connection, I couldn't say
219                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
220                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
221                         // > connection. It's probably easiest to do the latter for now.
222                         // https://github.com/anacrolix/torrent/pull/188
223                         SupportsEncryption: conn.headerEncrypted,
224                 })
225         }
226
227         return
228 }
229
230 func (t *Torrent) setChunkSize(size pp.Integer) {
231         t.chunkSize = size
232         t.chunkPool = sync.Pool{
233                 New: func() interface{} {
234                         b := make([]byte, size)
235                         return &b
236                 },
237         }
238 }
239
240 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
241         return t._completedPieces.Contains(bitmap.BitIndex(piece))
242 }
243
244 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
245         return t.pieces[piece].Storage().Completion()
246 }
247
248 // There's a connection to that address already.
249 func (t *Torrent) addrActive(addr string) bool {
250         if _, ok := t.halfOpen[addr]; ok {
251                 return true
252         }
253         for c := range t.conns {
254                 ra := c.RemoteAddr
255                 if ra.String() == addr {
256                         return true
257                 }
258         }
259         return false
260 }
261
262 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
263         for c := range t.conns {
264                 if !c.closed.IsSet() {
265                         ret = append(ret, c)
266                 }
267         }
268         return ret
269 }
270
271 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
272         cl := t.cl
273         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
274         if t.closed.IsSet() {
275                 return false
276         }
277         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
278                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
279                         torrent.Add("peers not added because of bad addr", 1)
280                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
281                         return false
282                 }
283         }
284         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
285                 torrent.Add("peers replaced", 1)
286                 if !replaced.equal(p) {
287                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
288                         added = true
289                 }
290         } else {
291                 added = true
292         }
293         t.openNewConns()
294         for t.peers.Len() > cl.config.TorrentPeersHighWater {
295                 _, ok := t.peers.DeleteMin()
296                 if ok {
297                         torrent.Add("excess reserve peers discarded", 1)
298                 }
299         }
300         return
301 }
302
303 func (t *Torrent) invalidateMetadata() {
304         for i := 0; i < len(t.metadataCompletedChunks); i++ {
305                 t.metadataCompletedChunks[i] = false
306         }
307         t.nameMu.Lock()
308         t.gotMetainfoC = make(chan struct{})
309         t.info = nil
310         t.nameMu.Unlock()
311 }
312
313 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
314         if t.haveInfo() {
315                 return
316         }
317         if index >= len(t.metadataCompletedChunks) {
318                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
319                 return
320         }
321         copy(t.metadataBytes[(1<<14)*index:], data)
322         t.metadataCompletedChunks[index] = true
323 }
324
325 func (t *Torrent) metadataPieceCount() int {
326         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
327 }
328
329 func (t *Torrent) haveMetadataPiece(piece int) bool {
330         if t.haveInfo() {
331                 return (1<<14)*piece < len(t.metadataBytes)
332         } else {
333                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
334         }
335 }
336
337 func (t *Torrent) metadataSize() int {
338         return len(t.metadataBytes)
339 }
340
341 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
342         for i := 0; i < len(info.Pieces); i += sha1.Size {
343                 ret = append(ret, info.Pieces[i:i+sha1.Size])
344         }
345         return
346 }
347
348 func (t *Torrent) makePieces() {
349         hashes := infoPieceHashes(t.info)
350         t.pieces = make([]Piece, len(hashes))
351         for i, hash := range hashes {
352                 piece := &t.pieces[i]
353                 piece.t = t
354                 piece.index = pieceIndex(i)
355                 piece.noPendingWrites.L = &piece.pendingWritesMutex
356                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
357                 files := *t.files
358                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
359                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
360                 piece.files = files[beginFile:endFile]
361                 piece.undirtiedChunksIter = undirtiedChunksIter{
362                         TorrentDirtyChunks: &t.dirtyChunks,
363                         StartRequestIndex:  piece.requestIndexOffset(),
364                         EndRequestIndex:    piece.requestIndexOffset() + piece.numChunks(),
365                 }
366         }
367 }
368
369 // Returns the index of the first file containing the piece. files must be
370 // ordered by offset.
371 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
372         for i, f := range files {
373                 if f.offset+f.length > pieceOffset {
374                         return i
375                 }
376         }
377         return 0
378 }
379
380 // Returns the index after the last file containing the piece. files must be
381 // ordered by offset.
382 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
383         for i, f := range files {
384                 if f.offset+f.length >= pieceEndOffset {
385                         return i + 1
386                 }
387         }
388         return 0
389 }
390
391 func (t *Torrent) cacheLength() {
392         var l int64
393         for _, f := range t.info.UpvertedFiles() {
394                 l += f.Length
395         }
396         t.length = &l
397 }
398
399 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
400 // separately.
401 func (t *Torrent) setInfo(info *metainfo.Info) error {
402         if err := validateInfo(info); err != nil {
403                 return fmt.Errorf("bad info: %s", err)
404         }
405         if t.storageOpener != nil {
406                 var err error
407                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
408                 if err != nil {
409                         return fmt.Errorf("error opening torrent storage: %s", err)
410                 }
411         }
412         t.nameMu.Lock()
413         t.info = info
414         t.nameMu.Unlock()
415         t.updateComplete()
416         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
417         t.displayName = "" // Save a few bytes lol.
418         t.initFiles()
419         t.cacheLength()
420         t.makePieces()
421         return nil
422 }
423
424 // This seems to be all the follow-up tasks after info is set, that can't fail.
425 func (t *Torrent) onSetInfo() {
426         for i := range t.pieces {
427                 p := &t.pieces[i]
428                 // Need to add availability before updating piece completion, as that may result in conns
429                 // being dropped.
430                 if p.availability != 0 {
431                         panic(p.availability)
432                 }
433                 p.availability = int64(t.pieceAvailabilityFromPeers(i))
434                 t.updatePieceCompletion(pieceIndex(i))
435                 if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
436                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
437                         t.queuePieceCheck(pieceIndex(i))
438                 }
439         }
440         t.cl.event.Broadcast()
441         close(t.gotMetainfoC)
442         t.updateWantPeersEvent()
443         t.pendingRequests.Init(t.numRequests())
444         t.tryCreateMorePieceHashers()
445         t.iterPeers(func(p *Peer) {
446                 p.onGotInfo(t.info)
447                 p.updateRequests("onSetInfo")
448         })
449 }
450
451 // Called when metadata for a torrent becomes available.
452 func (t *Torrent) setInfoBytesLocked(b []byte) error {
453         if metainfo.HashBytes(b) != t.infoHash {
454                 return errors.New("info bytes have wrong hash")
455         }
456         var info metainfo.Info
457         if err := bencode.Unmarshal(b, &info); err != nil {
458                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
459         }
460         t.metadataBytes = b
461         t.metadataCompletedChunks = nil
462         if t.info != nil {
463                 return nil
464         }
465         if err := t.setInfo(&info); err != nil {
466                 return err
467         }
468         t.onSetInfo()
469         return nil
470 }
471
472 func (t *Torrent) haveAllMetadataPieces() bool {
473         if t.haveInfo() {
474                 return true
475         }
476         if t.metadataCompletedChunks == nil {
477                 return false
478         }
479         for _, have := range t.metadataCompletedChunks {
480                 if !have {
481                         return false
482                 }
483         }
484         return true
485 }
486
487 // TODO: Propagate errors to disconnect peer.
488 func (t *Torrent) setMetadataSize(size int) (err error) {
489         if t.haveInfo() {
490                 // We already know the correct metadata size.
491                 return
492         }
493         if uint32(size) > maxMetadataSize {
494                 return errors.New("bad size")
495         }
496         if len(t.metadataBytes) == size {
497                 return
498         }
499         t.metadataBytes = make([]byte, size)
500         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
501         t.metadataChanged.Broadcast()
502         for c := range t.conns {
503                 c.requestPendingMetadata()
504         }
505         return
506 }
507
508 // The current working name for the torrent. Either the name in the info dict,
509 // or a display name given such as by the dn value in a magnet link, or "".
510 func (t *Torrent) name() string {
511         t.nameMu.RLock()
512         defer t.nameMu.RUnlock()
513         if t.haveInfo() {
514                 return t.info.Name
515         }
516         if t.displayName != "" {
517                 return t.displayName
518         }
519         return "infohash:" + t.infoHash.HexString()
520 }
521
522 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
523         p := &t.pieces[index]
524         ret.Priority = t.piecePriority(index)
525         ret.Completion = p.completion()
526         ret.QueuedForHash = p.queuedForHash()
527         ret.Hashing = p.hashing
528         ret.Checking = ret.QueuedForHash || ret.Hashing
529         ret.Marking = p.marking
530         if !ret.Complete && t.piecePartiallyDownloaded(index) {
531                 ret.Partial = true
532         }
533         return
534 }
535
536 func (t *Torrent) metadataPieceSize(piece int) int {
537         return metadataPieceSize(len(t.metadataBytes), piece)
538 }
539
540 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
541         return pp.Message{
542                 Type:       pp.Extended,
543                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
544                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
545                         Piece:     piece,
546                         TotalSize: len(t.metadataBytes),
547                         Type:      msgType,
548                 }), data...),
549         }
550 }
551
552 type pieceAvailabilityRun struct {
553         count        pieceIndex
554         availability int64
555 }
556
557 func (me pieceAvailabilityRun) String() string {
558         return fmt.Sprintf("%v(%v)", me.count, me.availability)
559 }
560
561 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
562         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
563                 ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
564         })
565         for i := range t.pieces {
566                 rle.Append(t.pieces[i].availability, 1)
567         }
568         rle.Flush()
569         return
570 }
571
572 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
573         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
574                 ret = append(ret, PieceStateRun{
575                         PieceState: el.(PieceState),
576                         Length:     int(count),
577                 })
578         })
579         for index := range t.pieces {
580                 rle.Append(t.pieceState(pieceIndex(index)), 1)
581         }
582         rle.Flush()
583         return
584 }
585
586 // Produces a small string representing a PieceStateRun.
587 func (psr PieceStateRun) String() (ret string) {
588         ret = fmt.Sprintf("%d", psr.Length)
589         ret += func() string {
590                 switch psr.Priority {
591                 case PiecePriorityNext:
592                         return "N"
593                 case PiecePriorityNormal:
594                         return "."
595                 case PiecePriorityReadahead:
596                         return "R"
597                 case PiecePriorityNow:
598                         return "!"
599                 case PiecePriorityHigh:
600                         return "H"
601                 default:
602                         return ""
603                 }
604         }()
605         if psr.Hashing {
606                 ret += "H"
607         }
608         if psr.QueuedForHash {
609                 ret += "Q"
610         }
611         if psr.Marking {
612                 ret += "M"
613         }
614         if psr.Partial {
615                 ret += "P"
616         }
617         if psr.Complete {
618                 ret += "C"
619         }
620         if !psr.Ok {
621                 ret += "?"
622         }
623         return
624 }
625
626 func (t *Torrent) writeStatus(w io.Writer) {
627         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
628         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
629         if !t.haveInfo() {
630                 fmt.Fprintf(w, "Metadata have: ")
631                 for _, h := range t.metadataCompletedChunks {
632                         fmt.Fprintf(w, "%c", func() rune {
633                                 if h {
634                                         return 'H'
635                                 } else {
636                                         return '.'
637                                 }
638                         }())
639                 }
640                 fmt.Fprintln(w)
641         }
642         fmt.Fprintf(w, "Piece length: %s\n",
643                 func() string {
644                         if t.haveInfo() {
645                                 return fmt.Sprintf("%v (%v chunks)",
646                                         t.usualPieceSize(),
647                                         float64(t.usualPieceSize())/float64(t.chunkSize))
648                         } else {
649                                 return "no info"
650                         }
651                 }(),
652         )
653         if t.info != nil {
654                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
655                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
656                 fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
657                         for _, run := range t.pieceAvailabilityRuns() {
658                                 ret = append(ret, run.String())
659                         }
660                         return
661                 }(), " "))
662         }
663         fmt.Fprintf(w, "Reader Pieces:")
664         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
665                 fmt.Fprintf(w, " %d:%d", begin, end)
666                 return true
667         })
668         fmt.Fprintln(w)
669
670         fmt.Fprintf(w, "Enabled trackers:\n")
671         func() {
672                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
673                 fmt.Fprintf(tw, "    URL\tExtra\n")
674                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
675                         lu := l.URL()
676                         ru := r.URL()
677                         var luns, runs url.URL = *lu, *ru
678                         luns.Scheme = ""
679                         runs.Scheme = ""
680                         var ml missinggo.MultiLess
681                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
682                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
683                         return ml.Less()
684                 }).([]torrentTrackerAnnouncer) {
685                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
686                 }
687                 tw.Flush()
688         }()
689
690         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
691
692         spew.NewDefaultConfig()
693         spew.Fdump(w, t.statsLocked())
694
695         peers := t.peersAsSlice()
696         sort.Slice(peers, func(_i, _j int) bool {
697                 i := peers[_i]
698                 j := peers[_j]
699                 if less, ok := multiless.New().EagerSameLess(
700                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
701                 ).LessOk(); ok {
702                         return less
703                 }
704                 return worseConn(i, j)
705         })
706         for i, c := range peers {
707                 fmt.Fprintf(w, "%2d. ", i+1)
708                 c.writeStatus(w, t)
709         }
710 }
711
712 func (t *Torrent) haveInfo() bool {
713         return t.info != nil
714 }
715
716 // Returns a run-time generated MetaInfo that includes the info bytes and
717 // announce-list as currently known to the client.
718 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
719         return metainfo.MetaInfo{
720                 CreationDate: time.Now().Unix(),
721                 Comment:      "dynamic metainfo from client",
722                 CreatedBy:    "go.torrent",
723                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
724                 InfoBytes: func() []byte {
725                         if t.haveInfo() {
726                                 return t.metadataBytes
727                         } else {
728                                 return nil
729                         }
730                 }(),
731                 UrlList: func() []string {
732                         ret := make([]string, 0, len(t.webSeeds))
733                         for url := range t.webSeeds {
734                                 ret = append(ret, url)
735                         }
736                         return ret
737                 }(),
738         }
739 }
740
741 // Get bytes left
742 func (t *Torrent) BytesMissing() (n int64) {
743         t.cl.rLock()
744         n = t.bytesMissingLocked()
745         t.cl.rUnlock()
746         return
747 }
748
749 func (t *Torrent) bytesMissingLocked() int64 {
750         return t.bytesLeft()
751 }
752
753 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
754         roaring.Flip(b, 0, end).Iterate(cb)
755 }
756
757 func (t *Torrent) bytesLeft() (left int64) {
758         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
759                 p := t.piece(pieceIndex(x))
760                 left += int64(p.length() - p.numDirtyBytes())
761                 return true
762         })
763         return
764 }
765
766 // Bytes left to give in tracker announces.
767 func (t *Torrent) bytesLeftAnnounce() int64 {
768         if t.haveInfo() {
769                 return t.bytesLeft()
770         } else {
771                 return -1
772         }
773 }
774
775 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
776         if t.pieceComplete(piece) {
777                 return false
778         }
779         if t.pieceAllDirty(piece) {
780                 return false
781         }
782         return t.pieces[piece].hasDirtyChunks()
783 }
784
785 func (t *Torrent) usualPieceSize() int {
786         return int(t.info.PieceLength)
787 }
788
789 func (t *Torrent) numPieces() pieceIndex {
790         return pieceIndex(t.info.NumPieces())
791 }
792
793 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
794         return pieceIndex(t._completedPieces.GetCardinality())
795 }
796
797 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
798         t.closed.Set()
799         if t.storage != nil {
800                 wg.Add(1)
801                 go func() {
802                         defer wg.Done()
803                         t.storageLock.Lock()
804                         defer t.storageLock.Unlock()
805                         if f := t.storage.Close; f != nil {
806                                 err1 := f()
807                                 if err1 != nil {
808                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
809                                 }
810                         }
811                 }()
812         }
813         t.iterPeers(func(p *Peer) {
814                 p.close()
815         })
816         t.pex.Reset()
817         t.cl.event.Broadcast()
818         t.pieceStateChanges.Close()
819         t.updateWantPeersEvent()
820         return
821 }
822
823 func (t *Torrent) requestOffset(r Request) int64 {
824         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
825 }
826
827 // Return the request that would include the given offset into the torrent data. Returns !ok if
828 // there is no such request.
829 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
830         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
831 }
832
833 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
834         defer perf.ScopeTimerErr(&err)()
835         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
836         if err == nil && n != len(data) {
837                 err = io.ErrShortWrite
838         }
839         return err
840 }
841
842 func (t *Torrent) bitfield() (bf []bool) {
843         bf = make([]bool, t.numPieces())
844         t._completedPieces.Iterate(func(piece uint32) (again bool) {
845                 bf[piece] = true
846                 return true
847         })
848         return
849 }
850
851 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
852         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
853 }
854
855 func (t *Torrent) chunksPerRegularPiece() uint32 {
856         return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
857 }
858
859 func (t *Torrent) numRequests() RequestIndex {
860         if t.numPieces() == 0 {
861                 return 0
862         }
863         return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1)
864 }
865
866 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
867         t.dirtyChunks.RemoveRange(
868                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
869                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
870 }
871
872 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
873         if t.info.PieceLength == 0 {
874                 // There will be no variance amongst pieces. Only pain.
875                 return 0
876         }
877         if piece == t.numPieces()-1 {
878                 ret := pp.Integer(*t.length % t.info.PieceLength)
879                 if ret != 0 {
880                         return ret
881                 }
882         }
883         return pp.Integer(t.info.PieceLength)
884 }
885
886 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
887         p := t.piece(piece)
888         p.waitNoPendingWrites()
889         storagePiece := t.pieces[piece].Storage()
890
891         //Does the backend want to do its own hashing?
892         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
893                 var sum metainfo.Hash
894                 //log.Printf("A piece decided to self-hash: %d", piece)
895                 sum, err = i.SelfHash()
896                 missinggo.CopyExact(&ret, sum)
897                 return
898         }
899
900         hash := pieceHash.New()
901         const logPieceContents = false
902         if logPieceContents {
903                 var examineBuf bytes.Buffer
904                 _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
905                 log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
906         } else {
907                 _, err = storagePiece.WriteTo(hash)
908         }
909         missinggo.CopyExact(&ret, hash.Sum(nil))
910         return
911 }
912
913 func (t *Torrent) haveAnyPieces() bool {
914         return t._completedPieces.GetCardinality() != 0
915 }
916
917 func (t *Torrent) haveAllPieces() bool {
918         if !t.haveInfo() {
919                 return false
920         }
921         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
922 }
923
924 func (t *Torrent) havePiece(index pieceIndex) bool {
925         return t.haveInfo() && t.pieceComplete(index)
926 }
927
928 func (t *Torrent) maybeDropMutuallyCompletePeer(
929         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
930         // okay?
931         p *Peer,
932 ) {
933         if !t.cl.config.DropMutuallyCompletePeers {
934                 return
935         }
936         if !t.haveAllPieces() {
937                 return
938         }
939         if all, known := p.peerHasAllPieces(); !(known && all) {
940                 return
941         }
942         if p.useful() {
943                 return
944         }
945         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
946         p.drop()
947 }
948
949 func (t *Torrent) haveChunk(r Request) (ret bool) {
950         // defer func() {
951         //      log.Println("have chunk", r, ret)
952         // }()
953         if !t.haveInfo() {
954                 return false
955         }
956         if t.pieceComplete(pieceIndex(r.Index)) {
957                 return true
958         }
959         p := &t.pieces[r.Index]
960         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
961 }
962
963 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
964         return chunkIndexType(cs.Begin / chunkSize)
965 }
966
967 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
968         return t._pendingPieces.Contains(uint32(index))
969 }
970
971 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
972 // conns (which is a map).
973 var peerConnSlices sync.Pool
974
975 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
976 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
977 // established connections for more than a minute. This is O(n log n). If there was a way to not
978 // consider the position of a conn relative to the total number, it could be reduced to O(n).
979 func (t *Torrent) worstBadConn() (ret *PeerConn) {
980         var sl []*PeerConn
981         getInterface := peerConnSlices.Get()
982         if getInterface == nil {
983                 sl = make([]*PeerConn, 0, len(t.conns))
984         } else {
985                 sl = getInterface.([]*PeerConn)[:0]
986         }
987         sl = t.appendUnclosedConns(sl)
988         defer peerConnSlices.Put(sl)
989         wcs := worseConnSlice{sl}
990         heap.Init(&wcs)
991         for wcs.Len() != 0 {
992                 c := heap.Pop(&wcs).(*PeerConn)
993                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
994                         return c
995                 }
996                 // If the connection is in the worst half of the established
997                 // connection quota and is older than a minute.
998                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
999                         // Give connections 1 minute to prove themselves.
1000                         if time.Since(c.completedHandshake) > time.Minute {
1001                                 return c
1002                         }
1003                 }
1004         }
1005         return nil
1006 }
1007
1008 type PieceStateChange struct {
1009         Index int
1010         PieceState
1011 }
1012
1013 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1014         t.cl._mu.Defer(func() {
1015                 cur := t.pieceState(piece)
1016                 p := &t.pieces[piece]
1017                 if cur != p.publicPieceState {
1018                         p.publicPieceState = cur
1019                         t.pieceStateChanges.Publish(PieceStateChange{
1020                                 int(piece),
1021                                 cur,
1022                         })
1023                 }
1024         })
1025 }
1026
1027 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1028         if t.pieceComplete(piece) {
1029                 return 0
1030         }
1031         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1032 }
1033
1034 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1035         return t.pieces[piece].allChunksDirty()
1036 }
1037
1038 func (t *Torrent) readersChanged() {
1039         t.updateReaderPieces()
1040         t.updateAllPiecePriorities("Torrent.readersChanged")
1041 }
1042
1043 func (t *Torrent) updateReaderPieces() {
1044         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1045 }
1046
1047 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1048         if from == to {
1049                 return
1050         }
1051         t.updateReaderPieces()
1052         // Order the ranges, high and low.
1053         l, h := from, to
1054         if l.begin > h.begin {
1055                 l, h = h, l
1056         }
1057         if l.end < h.begin {
1058                 // Two distinct ranges.
1059                 t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged")
1060                 t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged")
1061         } else {
1062                 // Ranges overlap.
1063                 end := l.end
1064                 if h.end > end {
1065                         end = h.end
1066                 }
1067                 t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged")
1068         }
1069 }
1070
1071 func (t *Torrent) maybeNewConns() {
1072         // Tickle the accept routine.
1073         t.cl.event.Broadcast()
1074         t.openNewConns()
1075 }
1076
1077 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
1078         if t._pendingPieces.Contains(uint32(piece)) {
1079                 t.iterPeers(func(c *Peer) {
1080                         if c.actualRequestState.Interested {
1081                                 return
1082                         }
1083                         if !c.isLowOnRequests() {
1084                                 return
1085                         }
1086                         if !c.peerHasPiece(piece) {
1087                                 return
1088                         }
1089                         c.updateRequests(reason)
1090                 })
1091         }
1092         t.maybeNewConns()
1093         t.publishPieceChange(piece)
1094 }
1095
1096 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
1097         p := &t.pieces[piece]
1098         newPrio := p.uncachedPriority()
1099         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1100         if newPrio == PiecePriorityNone {
1101                 if !t._pendingPieces.CheckedRemove(uint32(piece)) {
1102                         return
1103                 }
1104         } else {
1105                 if !t._pendingPieces.CheckedAdd(uint32(piece)) {
1106                         return
1107                 }
1108         }
1109         t.piecePriorityChanged(piece, reason)
1110 }
1111
1112 func (t *Torrent) updateAllPiecePriorities(reason string) {
1113         t.updatePiecePriorities(0, t.numPieces(), reason)
1114 }
1115
1116 // Update all piece priorities in one hit. This function should have the same
1117 // output as updatePiecePriority, but across all pieces.
1118 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) {
1119         for i := begin; i < end; i++ {
1120                 t.updatePiecePriority(i, reason)
1121         }
1122 }
1123
1124 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1125 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1126         if off >= *t.length {
1127                 return
1128         }
1129         if off < 0 {
1130                 size += off
1131                 off = 0
1132         }
1133         if size <= 0 {
1134                 return
1135         }
1136         begin = pieceIndex(off / t.info.PieceLength)
1137         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1138         if end > pieceIndex(t.info.NumPieces()) {
1139                 end = pieceIndex(t.info.NumPieces())
1140         }
1141         return
1142 }
1143
1144 // Returns true if all iterations complete without breaking. Returns the read regions for all
1145 // readers. The reader regions should not be merged as some callers depend on this method to
1146 // enumerate readers.
1147 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1148         for r := range t.readers {
1149                 p := r.pieces
1150                 if p.begin >= p.end {
1151                         continue
1152                 }
1153                 if !f(p.begin, p.end) {
1154                         return false
1155                 }
1156         }
1157         return true
1158 }
1159
1160 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1161         return t.piece(piece).uncachedPriority()
1162 }
1163
1164 func (t *Torrent) pendRequest(req RequestIndex) {
1165         t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
1166 }
1167
1168 func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) {
1169         t.cl.event.Broadcast()
1170         if t.pieceComplete(piece) {
1171                 t.onPieceCompleted(piece)
1172         } else {
1173                 t.onIncompletePiece(piece)
1174         }
1175         t.updatePiecePriority(piece, reason)
1176 }
1177
1178 func (t *Torrent) numReceivedConns() (ret int) {
1179         for c := range t.conns {
1180                 if c.Discovery == PeerSourceIncoming {
1181                         ret++
1182                 }
1183         }
1184         return
1185 }
1186
1187 func (t *Torrent) maxHalfOpen() int {
1188         // Note that if we somehow exceed the maximum established conns, we want
1189         // the negative value to have an effect.
1190         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1191         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1192         // We want to allow some experimentation with new peers, and to try to
1193         // upset an oversupply of received connections.
1194         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1195 }
1196
1197 func (t *Torrent) openNewConns() (initiated int) {
1198         defer t.updateWantPeersEvent()
1199         for t.peers.Len() != 0 {
1200                 if !t.wantConns() {
1201                         return
1202                 }
1203                 if len(t.halfOpen) >= t.maxHalfOpen() {
1204                         return
1205                 }
1206                 if len(t.cl.dialers) == 0 {
1207                         return
1208                 }
1209                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1210                         return
1211                 }
1212                 p := t.peers.PopMax()
1213                 t.initiateConn(p)
1214                 initiated++
1215         }
1216         return
1217 }
1218
1219 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1220         p := t.piece(piece)
1221         uncached := t.pieceCompleteUncached(piece)
1222         cached := p.completion()
1223         changed := cached != uncached
1224         complete := uncached.Complete
1225         p.storageCompletionOk = uncached.Ok
1226         x := uint32(piece)
1227         if complete {
1228                 t._completedPieces.Add(x)
1229         } else {
1230                 t._completedPieces.Remove(x)
1231         }
1232         t.updateComplete()
1233         if complete && len(p.dirtiers) != 0 {
1234                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1235         }
1236         if changed {
1237                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
1238                 t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
1239         }
1240         return changed
1241 }
1242
1243 // Non-blocking read. Client lock is not required.
1244 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1245         for len(b) != 0 {
1246                 p := &t.pieces[off/t.info.PieceLength]
1247                 p.waitNoPendingWrites()
1248                 var n1 int
1249                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1250                 if n1 == 0 {
1251                         break
1252                 }
1253                 off += int64(n1)
1254                 n += n1
1255                 b = b[n1:]
1256         }
1257         return
1258 }
1259
1260 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1261 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1262 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1263 func (t *Torrent) maybeCompleteMetadata() error {
1264         if t.haveInfo() {
1265                 // Nothing to do.
1266                 return nil
1267         }
1268         if !t.haveAllMetadataPieces() {
1269                 // Don't have enough metadata pieces.
1270                 return nil
1271         }
1272         err := t.setInfoBytesLocked(t.metadataBytes)
1273         if err != nil {
1274                 t.invalidateMetadata()
1275                 return fmt.Errorf("error setting info bytes: %s", err)
1276         }
1277         if t.cl.config.Debug {
1278                 t.logger.Printf("%s: got metadata from peers", t)
1279         }
1280         return nil
1281 }
1282
1283 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1284         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1285                 if end > begin {
1286                         now.Add(bitmap.BitIndex(begin))
1287                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1288                 }
1289                 return true
1290         })
1291         return
1292 }
1293
1294 func (t *Torrent) needData() bool {
1295         if t.closed.IsSet() {
1296                 return false
1297         }
1298         if !t.haveInfo() {
1299                 return true
1300         }
1301         return !t._pendingPieces.IsEmpty()
1302 }
1303
1304 func appendMissingStrings(old, new []string) (ret []string) {
1305         ret = old
1306 new:
1307         for _, n := range new {
1308                 for _, o := range old {
1309                         if o == n {
1310                                 continue new
1311                         }
1312                 }
1313                 ret = append(ret, n)
1314         }
1315         return
1316 }
1317
1318 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1319         ret = existing
1320         for minNumTiers > len(ret) {
1321                 ret = append(ret, nil)
1322         }
1323         return
1324 }
1325
1326 func (t *Torrent) addTrackers(announceList [][]string) {
1327         fullAnnounceList := &t.metainfo.AnnounceList
1328         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1329         for tierIndex, trackerURLs := range announceList {
1330                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1331         }
1332         t.startMissingTrackerScrapers()
1333         t.updateWantPeersEvent()
1334 }
1335
1336 // Don't call this before the info is available.
1337 func (t *Torrent) bytesCompleted() int64 {
1338         if !t.haveInfo() {
1339                 return 0
1340         }
1341         return *t.length - t.bytesLeft()
1342 }
1343
1344 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1345         t.cl.lock()
1346         defer t.cl.unlock()
1347         return t.setInfoBytesLocked(b)
1348 }
1349
1350 // Returns true if connection is removed from torrent.Conns.
1351 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1352         if !c.closed.IsSet() {
1353                 panic("connection is not closed")
1354                 // There are behaviours prevented by the closed state that will fail
1355                 // if the connection has been deleted.
1356         }
1357         _, ret = t.conns[c]
1358         delete(t.conns, c)
1359         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1360         // the drop event against the PexConnState instead.
1361         if ret {
1362                 if !t.cl.config.DisablePEX {
1363                         t.pex.Drop(c)
1364                 }
1365         }
1366         torrent.Add("deleted connections", 1)
1367         c.deleteAllRequests()
1368         t.assertPendingRequests()
1369         return
1370 }
1371
1372 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1373         if !t.haveInfo() {
1374                 return
1375         }
1376         p.newPeerPieces().Iterate(func(i uint32) bool {
1377                 p.t.decPieceAvailability(pieceIndex(i))
1378                 return true
1379         })
1380 }
1381
1382 func (t *Torrent) assertPendingRequests() {
1383         if !check {
1384                 return
1385         }
1386         var actual pendingRequests
1387         if t.haveInfo() {
1388                 actual.m = make([]int, t.numRequests())
1389         }
1390         t.iterPeers(func(p *Peer) {
1391                 p.actualRequestState.Requests.Iterate(func(x uint32) bool {
1392                         actual.Inc(x)
1393                         return true
1394                 })
1395         })
1396         diff := cmp.Diff(actual.m, t.pendingRequests.m)
1397         if diff != "" {
1398                 panic(diff)
1399         }
1400 }
1401
1402 func (t *Torrent) dropConnection(c *PeerConn) {
1403         t.cl.event.Broadcast()
1404         c.close()
1405         if t.deletePeerConn(c) {
1406                 t.openNewConns()
1407         }
1408 }
1409
1410 func (t *Torrent) wantPeers() bool {
1411         if t.closed.IsSet() {
1412                 return false
1413         }
1414         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1415                 return false
1416         }
1417         return t.needData() || t.seeding()
1418 }
1419
1420 func (t *Torrent) updateWantPeersEvent() {
1421         if t.wantPeers() {
1422                 t.wantPeersEvent.Set()
1423         } else {
1424                 t.wantPeersEvent.Clear()
1425         }
1426 }
1427
1428 // Returns whether the client should make effort to seed the torrent.
1429 func (t *Torrent) seeding() bool {
1430         cl := t.cl
1431         if t.closed.IsSet() {
1432                 return false
1433         }
1434         if t.dataUploadDisallowed {
1435                 return false
1436         }
1437         if cl.config.NoUpload {
1438                 return false
1439         }
1440         if !cl.config.Seed {
1441                 return false
1442         }
1443         if cl.config.DisableAggressiveUpload && t.needData() {
1444                 return false
1445         }
1446         return true
1447 }
1448
1449 func (t *Torrent) onWebRtcConn(
1450         c datachannel.ReadWriteCloser,
1451         dcc webtorrent.DataChannelContext,
1452 ) {
1453         defer c.Close()
1454         pc, err := t.cl.initiateProtocolHandshakes(
1455                 context.Background(),
1456                 webrtcNetConn{c, dcc},
1457                 t,
1458                 dcc.LocalOffered,
1459                 false,
1460                 webrtcNetAddr{dcc.Remote},
1461                 webrtcNetwork,
1462                 fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
1463         )
1464         if err != nil {
1465                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1466                 return
1467         }
1468         if dcc.LocalOffered {
1469                 pc.Discovery = PeerSourceTracker
1470         } else {
1471                 pc.Discovery = PeerSourceIncoming
1472         }
1473         t.cl.lock()
1474         defer t.cl.unlock()
1475         err = t.cl.runHandshookConn(pc, t)
1476         if err != nil {
1477                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1478         }
1479 }
1480
1481 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1482         err := t.cl.runHandshookConn(pc, t)
1483         if err != nil || logAll {
1484                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1485         }
1486 }
1487
1488 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1489         t.logRunHandshookConn(pc, false, log.Debug)
1490 }
1491
1492 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1493         wtc, release := t.cl.websocketTrackers.Get(u.String())
1494         go func() {
1495                 <-t.closed.Done()
1496                 release()
1497         }()
1498         wst := websocketTrackerStatus{u, wtc}
1499         go func() {
1500                 err := wtc.Announce(tracker.Started, t.infoHash)
1501                 if err != nil {
1502                         t.logger.WithDefaultLevel(log.Warning).Printf(
1503                                 "error in initial announce to %q: %v",
1504                                 u.String(), err,
1505                         )
1506                 }
1507         }()
1508         return wst
1509
1510 }
1511
1512 func (t *Torrent) startScrapingTracker(_url string) {
1513         if _url == "" {
1514                 return
1515         }
1516         u, err := url.Parse(_url)
1517         if err != nil {
1518                 // URLs with a leading '*' appear to be a uTorrent convention to
1519                 // disable trackers.
1520                 if _url[0] != '*' {
1521                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1522                 }
1523                 return
1524         }
1525         if u.Scheme == "udp" {
1526                 u.Scheme = "udp4"
1527                 t.startScrapingTracker(u.String())
1528                 u.Scheme = "udp6"
1529                 t.startScrapingTracker(u.String())
1530                 return
1531         }
1532         if _, ok := t.trackerAnnouncers[_url]; ok {
1533                 return
1534         }
1535         sl := func() torrentTrackerAnnouncer {
1536                 switch u.Scheme {
1537                 case "ws", "wss":
1538                         if t.cl.config.DisableWebtorrent {
1539                                 return nil
1540                         }
1541                         return t.startWebsocketAnnouncer(*u)
1542                 case "udp4":
1543                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1544                                 return nil
1545                         }
1546                 case "udp6":
1547                         if t.cl.config.DisableIPv6 {
1548                                 return nil
1549                         }
1550                 }
1551                 newAnnouncer := &trackerScraper{
1552                         u:               *u,
1553                         t:               t,
1554                         lookupTrackerIp: t.cl.config.LookupTrackerIp,
1555                 }
1556                 go newAnnouncer.Run()
1557                 return newAnnouncer
1558         }()
1559         if sl == nil {
1560                 return
1561         }
1562         if t.trackerAnnouncers == nil {
1563                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1564         }
1565         t.trackerAnnouncers[_url] = sl
1566 }
1567
1568 // Adds and starts tracker scrapers for tracker URLs that aren't already
1569 // running.
1570 func (t *Torrent) startMissingTrackerScrapers() {
1571         if t.cl.config.DisableTrackers {
1572                 return
1573         }
1574         t.startScrapingTracker(t.metainfo.Announce)
1575         for _, tier := range t.metainfo.AnnounceList {
1576                 for _, url := range tier {
1577                         t.startScrapingTracker(url)
1578                 }
1579         }
1580 }
1581
1582 // Returns an AnnounceRequest with fields filled out to defaults and current
1583 // values.
1584 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1585         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1586         // dependent on the network in use.
1587         return tracker.AnnounceRequest{
1588                 Event: event,
1589                 NumWant: func() int32 {
1590                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1591                                 return -1
1592                         } else {
1593                                 return 0
1594                         }
1595                 }(),
1596                 Port:     uint16(t.cl.incomingPeerPort()),
1597                 PeerId:   t.cl.peerID,
1598                 InfoHash: t.infoHash,
1599                 Key:      t.cl.announceKey(),
1600
1601                 // The following are vaguely described in BEP 3.
1602
1603                 Left:     t.bytesLeftAnnounce(),
1604                 Uploaded: t.stats.BytesWrittenData.Int64(),
1605                 // There's no mention of wasted or unwanted download in the BEP.
1606                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1607         }
1608 }
1609
1610 // Adds peers revealed in an announce until the announce ends, or we have
1611 // enough peers.
1612 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1613         cl := t.cl
1614         for v := range pvs {
1615                 cl.lock()
1616                 added := 0
1617                 for _, cp := range v.Peers {
1618                         if cp.Port == 0 {
1619                                 // Can't do anything with this.
1620                                 continue
1621                         }
1622                         if t.addPeer(PeerInfo{
1623                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1624                                 Source: PeerSourceDhtGetPeers,
1625                         }) {
1626                                 added++
1627                         }
1628                 }
1629                 cl.unlock()
1630                 // if added != 0 {
1631                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1632                 // }
1633         }
1634 }
1635
1636 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1637 // announce ends. stop will force the announce to end.
1638 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1639         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1640         if err != nil {
1641                 return
1642         }
1643         _done := make(chan struct{})
1644         done = _done
1645         stop = ps.Close
1646         go func() {
1647                 t.consumeDhtAnnouncePeers(ps.Peers())
1648                 close(_done)
1649         }()
1650         return
1651 }
1652
1653 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1654         _, stop, err := t.AnnounceToDht(s)
1655         if err != nil {
1656                 return err
1657         }
1658         select {
1659         case <-t.closed.Done():
1660         case <-time.After(5 * time.Minute):
1661         }
1662         stop()
1663         return nil
1664 }
1665
1666 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1667         cl := t.cl
1668         cl.lock()
1669         defer cl.unlock()
1670         for {
1671                 for {
1672                         if t.closed.IsSet() {
1673                                 return
1674                         }
1675                         if !t.wantPeers() {
1676                                 goto wait
1677                         }
1678                         // TODO: Determine if there's a listener on the port we're announcing.
1679                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1680                                 goto wait
1681                         }
1682                         break
1683                 wait:
1684                         cl.event.Wait()
1685                 }
1686                 func() {
1687                         t.numDHTAnnounces++
1688                         cl.unlock()
1689                         defer cl.lock()
1690                         err := t.timeboxedAnnounceToDht(s)
1691                         if err != nil {
1692                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1693                         }
1694                 }()
1695         }
1696 }
1697
1698 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1699         for _, p := range peers {
1700                 if t.addPeer(p) {
1701                         added++
1702                 }
1703         }
1704         return
1705 }
1706
1707 // The returned TorrentStats may require alignment in memory. See
1708 // https://github.com/anacrolix/torrent/issues/383.
1709 func (t *Torrent) Stats() TorrentStats {
1710         t.cl.rLock()
1711         defer t.cl.rUnlock()
1712         return t.statsLocked()
1713 }
1714
1715 func (t *Torrent) statsLocked() (ret TorrentStats) {
1716         ret.ActivePeers = len(t.conns)
1717         ret.HalfOpenPeers = len(t.halfOpen)
1718         ret.PendingPeers = t.peers.Len()
1719         ret.TotalPeers = t.numTotalPeers()
1720         ret.ConnectedSeeders = 0
1721         for c := range t.conns {
1722                 if all, ok := c.peerHasAllPieces(); all && ok {
1723                         ret.ConnectedSeeders++
1724                 }
1725         }
1726         ret.ConnStats = t.stats.Copy()
1727         ret.PiecesComplete = t.numPiecesCompleted()
1728         return
1729 }
1730
1731 // The total number of peers in the torrent.
1732 func (t *Torrent) numTotalPeers() int {
1733         peers := make(map[string]struct{})
1734         for conn := range t.conns {
1735                 ra := conn.conn.RemoteAddr()
1736                 if ra == nil {
1737                         // It's been closed and doesn't support RemoteAddr.
1738                         continue
1739                 }
1740                 peers[ra.String()] = struct{}{}
1741         }
1742         for addr := range t.halfOpen {
1743                 peers[addr] = struct{}{}
1744         }
1745         t.peers.Each(func(peer PeerInfo) {
1746                 peers[peer.Addr.String()] = struct{}{}
1747         })
1748         return len(peers)
1749 }
1750
1751 // Reconcile bytes transferred before connection was associated with a
1752 // torrent.
1753 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1754         if c._stats != (ConnStats{
1755                 // Handshakes should only increment these fields:
1756                 BytesWritten: c._stats.BytesWritten,
1757                 BytesRead:    c._stats.BytesRead,
1758         }) {
1759                 panic("bad stats")
1760         }
1761         c.postHandshakeStats(func(cs *ConnStats) {
1762                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1763                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1764         })
1765         c.reconciledHandshakeStats = true
1766 }
1767
1768 // Returns true if the connection is added.
1769 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1770         defer func() {
1771                 if err == nil {
1772                         torrent.Add("added connections", 1)
1773                 }
1774         }()
1775         if t.closed.IsSet() {
1776                 return errors.New("torrent closed")
1777         }
1778         for c0 := range t.conns {
1779                 if c.PeerID != c0.PeerID {
1780                         continue
1781                 }
1782                 if !t.cl.config.DropDuplicatePeerIds {
1783                         continue
1784                 }
1785                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1786                         c0.close()
1787                         t.deletePeerConn(c0)
1788                 } else {
1789                         return errors.New("existing connection preferred")
1790                 }
1791         }
1792         if len(t.conns) >= t.maxEstablishedConns {
1793                 c := t.worstBadConn()
1794                 if c == nil {
1795                         return errors.New("don't want conns")
1796                 }
1797                 c.close()
1798                 t.deletePeerConn(c)
1799         }
1800         if len(t.conns) >= t.maxEstablishedConns {
1801                 panic(len(t.conns))
1802         }
1803         t.conns[c] = struct{}{}
1804         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1805                 t.pex.Add(c) // as no further extended handshake expected
1806         }
1807         return nil
1808 }
1809
1810 func (t *Torrent) wantConns() bool {
1811         if !t.networkingEnabled.Bool() {
1812                 return false
1813         }
1814         if t.closed.IsSet() {
1815                 return false
1816         }
1817         if !t.seeding() && !t.needData() {
1818                 return false
1819         }
1820         if len(t.conns) < t.maxEstablishedConns {
1821                 return true
1822         }
1823         return t.worstBadConn() != nil
1824 }
1825
1826 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1827         t.cl.lock()
1828         defer t.cl.unlock()
1829         oldMax = t.maxEstablishedConns
1830         t.maxEstablishedConns = max
1831         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
1832                 return worseConn(&l.Peer, &r.Peer)
1833         })
1834         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1835                 t.dropConnection(wcs.Pop().(*PeerConn))
1836         }
1837         t.openNewConns()
1838         return oldMax
1839 }
1840
1841 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1842         t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
1843         p := t.piece(piece)
1844         p.numVerifies++
1845         t.cl.event.Broadcast()
1846         if t.closed.IsSet() {
1847                 return
1848         }
1849
1850         // Don't score the first time a piece is hashed, it could be an initial check.
1851         if p.storageCompletionOk {
1852                 if passed {
1853                         pieceHashedCorrect.Add(1)
1854                 } else {
1855                         log.Fmsg(
1856                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1857                         ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
1858                         pieceHashedNotCorrect.Add(1)
1859                 }
1860         }
1861
1862         p.marking = true
1863         t.publishPieceChange(piece)
1864         defer func() {
1865                 p.marking = false
1866                 t.publishPieceChange(piece)
1867         }()
1868
1869         if passed {
1870                 if len(p.dirtiers) != 0 {
1871                         // Don't increment stats above connection-level for every involved connection.
1872                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1873                 }
1874                 for c := range p.dirtiers {
1875                         c._stats.incrementPiecesDirtiedGood()
1876                 }
1877                 t.clearPieceTouchers(piece)
1878                 t.cl.unlock()
1879                 err := p.Storage().MarkComplete()
1880                 if err != nil {
1881                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1882                 }
1883                 t.cl.lock()
1884
1885                 if t.closed.IsSet() {
1886                         return
1887                 }
1888                 t.pendAllChunkSpecs(piece)
1889         } else {
1890                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
1891                         // Peers contributed to all the data for this piece hash failure, and the failure was
1892                         // not due to errors in the storage (such as data being dropped in a cache).
1893
1894                         // Increment Torrent and above stats, and then specific connections.
1895                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1896                         for c := range p.dirtiers {
1897                                 // Y u do dis peer?!
1898                                 c.stats().incrementPiecesDirtiedBad()
1899                         }
1900
1901                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
1902                         for c := range p.dirtiers {
1903                                 if !c.trusted {
1904                                         bannableTouchers = append(bannableTouchers, c)
1905                                 }
1906                         }
1907                         t.clearPieceTouchers(piece)
1908                         slices.Sort(bannableTouchers, connLessTrusted)
1909
1910                         if t.cl.config.Debug {
1911                                 t.logger.Printf(
1912                                         "bannable conns by trust for piece %d: %v",
1913                                         piece,
1914                                         func() (ret []connectionTrust) {
1915                                                 for _, c := range bannableTouchers {
1916                                                         ret = append(ret, c.trust())
1917                                                 }
1918                                                 return
1919                                         }(),
1920                                 )
1921                         }
1922
1923                         if len(bannableTouchers) >= 1 {
1924                                 c := bannableTouchers[0]
1925                                 t.cl.banPeerIP(c.remoteIp())
1926                                 c.drop()
1927                         }
1928                 }
1929                 t.onIncompletePiece(piece)
1930                 p.Storage().MarkNotComplete()
1931         }
1932         t.updatePieceCompletion(piece)
1933 }
1934
1935 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1936         // TODO: Make faster
1937         for cn := range t.conns {
1938                 cn.tickleWriter()
1939         }
1940 }
1941
1942 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1943         t.pendAllChunkSpecs(piece)
1944         t.cancelRequestsForPiece(piece)
1945         t.piece(piece).readerCond.Broadcast()
1946         for conn := range t.conns {
1947                 conn.have(piece)
1948                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
1949         }
1950 }
1951
1952 // Called when a piece is found to be not complete.
1953 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1954         if t.pieceAllDirty(piece) {
1955                 t.pendAllChunkSpecs(piece)
1956         }
1957         if !t.wantPieceIndex(piece) {
1958                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1959                 return
1960         }
1961         // We could drop any connections that we told we have a piece that we
1962         // don't here. But there's a test failure, and it seems clients don't care
1963         // if you request pieces that you already claim to have. Pruning bad
1964         // connections might just remove any connections that aren't treating us
1965         // favourably anyway.
1966
1967         // for c := range t.conns {
1968         //      if c.sentHave(piece) {
1969         //              c.drop()
1970         //      }
1971         // }
1972         t.iterPeers(func(conn *Peer) {
1973                 if conn.peerHasPiece(piece) {
1974                         conn.updateRequests("piece incomplete")
1975                 }
1976         })
1977 }
1978
1979 func (t *Torrent) tryCreateMorePieceHashers() {
1980         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
1981         }
1982 }
1983
1984 func (t *Torrent) tryCreatePieceHasher() bool {
1985         if t.storage == nil {
1986                 return false
1987         }
1988         pi, ok := t.getPieceToHash()
1989         if !ok {
1990                 return false
1991         }
1992         p := t.piece(pi)
1993         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
1994         p.hashing = true
1995         t.publishPieceChange(pi)
1996         t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher")
1997         t.storageLock.RLock()
1998         t.activePieceHashes++
1999         go t.pieceHasher(pi)
2000         return true
2001 }
2002
2003 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2004         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2005                 if t.piece(i).hashing {
2006                         return true
2007                 }
2008                 ret = i
2009                 ok = true
2010                 return false
2011         })
2012         return
2013 }
2014
2015 func (t *Torrent) pieceHasher(index pieceIndex) {
2016         p := t.piece(index)
2017         sum, copyErr := t.hashPiece(index)
2018         correct := sum == *p.hash
2019         switch copyErr {
2020         case nil, io.EOF:
2021         default:
2022                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2023         }
2024         t.storageLock.RUnlock()
2025         t.cl.lock()
2026         defer t.cl.unlock()
2027         p.hashing = false
2028         t.pieceHashed(index, correct, copyErr)
2029         t.updatePiecePriority(index, "Torrent.pieceHasher")
2030         t.activePieceHashes--
2031         t.tryCreateMorePieceHashers()
2032 }
2033
2034 // Return the connections that touched a piece, and clear the entries while doing it.
2035 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2036         p := t.piece(pi)
2037         for c := range p.dirtiers {
2038                 delete(c.peerTouchedPieces, pi)
2039                 delete(p.dirtiers, c)
2040         }
2041 }
2042
2043 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2044         t.iterPeers(func(p *Peer) {
2045                 ret = append(ret, p)
2046         })
2047         return
2048 }
2049
2050 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2051         piece := t.piece(pieceIndex)
2052         if piece.queuedForHash() {
2053                 return
2054         }
2055         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2056         t.publishPieceChange(pieceIndex)
2057         t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck")
2058         t.tryCreateMorePieceHashers()
2059 }
2060
2061 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2062 // before the Info is available.
2063 func (t *Torrent) VerifyData() {
2064         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2065                 t.Piece(i).VerifyData()
2066         }
2067 }
2068
2069 // Start the process of connecting to the given peer for the given torrent if appropriate.
2070 func (t *Torrent) initiateConn(peer PeerInfo) {
2071         if peer.Id == t.cl.peerID {
2072                 return
2073         }
2074         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2075                 return
2076         }
2077         addr := peer.Addr
2078         if t.addrActive(addr.String()) {
2079                 return
2080         }
2081         t.cl.numHalfOpen++
2082         t.halfOpen[addr.String()] = peer
2083         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2084 }
2085
2086 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2087 // quickly make one Client visible to the Torrent of another Client.
2088 func (t *Torrent) AddClientPeer(cl *Client) int {
2089         return t.AddPeers(func() (ps []PeerInfo) {
2090                 for _, la := range cl.ListenAddrs() {
2091                         ps = append(ps, PeerInfo{
2092                                 Addr:    la,
2093                                 Trusted: true,
2094                         })
2095                 }
2096                 return
2097         }())
2098 }
2099
2100 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2101 // connection.
2102 func (t *Torrent) allStats(f func(*ConnStats)) {
2103         f(&t.stats)
2104         f(&t.cl.stats)
2105 }
2106
2107 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2108         return t.pieces[i].hashing
2109 }
2110
2111 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2112         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2113 }
2114
2115 func (t *Torrent) dialTimeout() time.Duration {
2116         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2117 }
2118
2119 func (t *Torrent) piece(i int) *Piece {
2120         return &t.pieces[i]
2121 }
2122
2123 func (t *Torrent) onWriteChunkErr(err error) {
2124         if t.userOnWriteChunkErr != nil {
2125                 go t.userOnWriteChunkErr(err)
2126                 return
2127         }
2128         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2129         t.disallowDataDownloadLocked()
2130 }
2131
2132 func (t *Torrent) DisallowDataDownload() {
2133         t.disallowDataDownloadLocked()
2134 }
2135
2136 func (t *Torrent) disallowDataDownloadLocked() {
2137         t.dataDownloadDisallowed.Set()
2138 }
2139
2140 func (t *Torrent) AllowDataDownload() {
2141         t.dataDownloadDisallowed.Clear()
2142 }
2143
2144 // Enables uploading data, if it was disabled.
2145 func (t *Torrent) AllowDataUpload() {
2146         t.cl.lock()
2147         defer t.cl.unlock()
2148         t.dataUploadDisallowed = false
2149         for c := range t.conns {
2150                 c.updateRequests("allow data upload")
2151         }
2152 }
2153
2154 // Disables uploading data, if it was enabled.
2155 func (t *Torrent) DisallowDataUpload() {
2156         t.cl.lock()
2157         defer t.cl.unlock()
2158         t.dataUploadDisallowed = true
2159         for c := range t.conns {
2160                 c.updateRequests("disallow data upload")
2161         }
2162 }
2163
2164 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2165 // or if nil, a critical message is logged, and data download is disabled.
2166 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2167         t.cl.lock()
2168         defer t.cl.unlock()
2169         t.userOnWriteChunkErr = f
2170 }
2171
2172 func (t *Torrent) iterPeers(f func(p *Peer)) {
2173         for pc := range t.conns {
2174                 f(&pc.Peer)
2175         }
2176         for _, ws := range t.webSeeds {
2177                 f(ws)
2178         }
2179 }
2180
2181 func (t *Torrent) callbacks() *Callbacks {
2182         return &t.cl.config.Callbacks
2183 }
2184
2185 var WebseedHttpClient = &http.Client{
2186         Transport: &http.Transport{
2187                 MaxConnsPerHost: 10,
2188         },
2189 }
2190
2191 func (t *Torrent) addWebSeed(url string) {
2192         if t.cl.config.DisableWebseeds {
2193                 return
2194         }
2195         if _, ok := t.webSeeds[url]; ok {
2196                 return
2197         }
2198         const maxRequests = 10
2199         ws := webseedPeer{
2200                 peer: Peer{
2201                         t:                        t,
2202                         outgoing:                 true,
2203                         Network:                  "http",
2204                         reconciledHandshakeStats: true,
2205                         // TODO: Raise this limit, and instead limit concurrent fetches.
2206                         PeerMaxRequests: 32,
2207                         RemoteAddr:      remoteAddrFromUrl(url),
2208                         callbacks:       t.callbacks(),
2209                 },
2210                 client: webseed.Client{
2211                         // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
2212                         HttpClient: WebseedHttpClient,
2213                         Url:        url,
2214                 },
2215                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2216                 maxRequests:    maxRequests,
2217         }
2218         ws.peer.initUpdateRequestsTimer()
2219         ws.requesterCond.L = t.cl.locker()
2220         for i := 0; i < maxRequests; i += 1 {
2221                 go ws.requester()
2222         }
2223         for _, f := range t.callbacks().NewPeer {
2224                 f(&ws.peer)
2225         }
2226         ws.peer.logger = t.logger.WithContextValue(&ws)
2227         ws.peer.peerImpl = &ws
2228         if t.haveInfo() {
2229                 ws.onGotInfo(t.info)
2230         }
2231         t.webSeeds[url] = &ws.peer
2232         ws.peer.onPeerHasAllPieces()
2233 }
2234
2235 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2236         t.iterPeers(func(p1 *Peer) {
2237                 if p1 == p {
2238                         active = true
2239                 }
2240         })
2241         return
2242 }
2243
2244 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2245         index := ri / t.chunksPerRegularPiece()
2246         return Request{
2247                 pp.Integer(index),
2248                 t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2249         }
2250 }
2251
2252 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2253         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
2254 }
2255
2256 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2257         return RequestIndex(piece) * t.chunksPerRegularPiece()
2258 }
2259
2260 func (t *Torrent) updateComplete() {
2261         t.Complete.SetBool(t.haveAllPieces())
2262 }