]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
d184afd3f1f5fe831c3b1bf0b6e4c0632b08bf3b
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "math/rand"
12         "net/http"
13         "net/url"
14         "sort"
15         "strings"
16         "text/tabwriter"
17         "time"
18         "unsafe"
19
20         "github.com/RoaringBitmap/roaring"
21         "github.com/anacrolix/chansync"
22         "github.com/anacrolix/dht/v2"
23         "github.com/anacrolix/log"
24         "github.com/anacrolix/missinggo/perf"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/missinggo/slices"
27         "github.com/anacrolix/missinggo/v2"
28         "github.com/anacrolix/missinggo/v2/bitmap"
29         "github.com/anacrolix/missinggo/v2/prioritybitmap"
30         "github.com/anacrolix/multiless"
31         "github.com/anacrolix/sync"
32         "github.com/davecgh/go-spew/spew"
33         "github.com/pion/datachannel"
34
35         "github.com/anacrolix/torrent/bencode"
36         "github.com/anacrolix/torrent/common"
37         "github.com/anacrolix/torrent/metainfo"
38         pp "github.com/anacrolix/torrent/peer_protocol"
39         "github.com/anacrolix/torrent/segments"
40         "github.com/anacrolix/torrent/storage"
41         "github.com/anacrolix/torrent/tracker"
42         "github.com/anacrolix/torrent/webseed"
43         "github.com/anacrolix/torrent/webtorrent"
44 )
45
46 // Maintains state of torrent within a Client. Many methods should not be called before the info is
47 // available, see .Info and .GotInfo.
48 type Torrent struct {
49         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
50         // alignment. See #262.
51         stats  ConnStats
52         cl     *Client
53         logger log.Logger
54
55         networkingEnabled      chansync.Flag
56         dataDownloadDisallowed chansync.Flag
57         dataUploadDisallowed   bool
58         userOnWriteChunkErr    func(error)
59
60         closed   chansync.SetOnce
61         infoHash metainfo.Hash
62         pieces   []Piece
63         // Values are the piece indices that changed.
64         pieceStateChanges *pubsub.PubSub
65         // The size of chunks to request from peers over the wire. This is
66         // normally 16KiB by convention these days.
67         chunkSize pp.Integer
68         chunkPool sync.Pool
69         // Total length of the torrent in bytes. Stored because it's not O(1) to
70         // get this from the info dict.
71         length *int64
72
73         // The storage to open when the info dict becomes available.
74         storageOpener *storage.Client
75         // Storage for torrent data.
76         storage *storage.Torrent
77         // Read-locked for using storage, and write-locked for Closing.
78         storageLock sync.RWMutex
79
80         // TODO: Only announce stuff is used?
81         metainfo metainfo.MetaInfo
82
83         // The info dict. nil if we don't have it (yet).
84         info      *metainfo.Info
85         fileIndex segments.Index
86         files     *[]*File
87
88         webSeeds map[string]*Peer
89
90         // Active peer connections, running message stream loops. TODO: Make this
91         // open (not-closed) connections only.
92         conns               map[*PeerConn]struct{}
93         maxEstablishedConns int
94         // Set of addrs to which we're attempting to connect. Connections are
95         // half-open until all handshakes are completed.
96         halfOpen map[string]PeerInfo
97
98         // Reserve of peers to connect to. A peer can be both here and in the
99         // active connections if were told about the peer after connecting with
100         // them. That encourages us to reconnect to peers that are well known in
101         // the swarm.
102         peers prioritizedPeers
103         // Whether we want to know to know more peers.
104         wantPeersEvent missinggo.Event
105         // An announcer for each tracker URL.
106         trackerAnnouncers map[string]torrentTrackerAnnouncer
107         // How many times we've initiated a DHT announce. TODO: Move into stats.
108         numDHTAnnounces int
109
110         // Name used if the info name isn't available. Should be cleared when the
111         // Info does become available.
112         nameMu      sync.RWMutex
113         displayName string
114
115         // The bencoded bytes of the info dict. This is actively manipulated if
116         // the info bytes aren't initially available, and we try to fetch them
117         // from peers.
118         metadataBytes []byte
119         // Each element corresponds to the 16KiB metadata pieces. If true, we have
120         // received that piece.
121         metadataCompletedChunks []bool
122         metadataChanged         sync.Cond
123
124         // Closed when .Info is obtained.
125         gotMetainfoC chan struct{}
126
127         readers                map[*reader]struct{}
128         _readerNowPieces       bitmap.Bitmap
129         _readerReadaheadPieces bitmap.Bitmap
130
131         // A cache of pieces we need to get. Calculated from various piece and
132         // file priorities and completion states elsewhere.
133         _pendingPieces prioritybitmap.PriorityBitmap
134         // A cache of completed piece indices.
135         _completedPieces roaring.Bitmap
136         // Pieces that need to be hashed.
137         piecesQueuedForHash bitmap.Bitmap
138         activePieceHashes   int
139
140         // A pool of piece priorities []int for assignment to new connections.
141         // These "inclinations" are used to give connections preference for
142         // different pieces.
143         connPieceInclinationPool sync.Pool
144
145         // Count of each request across active connections.
146         pendingRequests map[RequestIndex]int
147         // Chunks we've written to since the corresponding piece was last checked.
148         dirtyChunks roaring.Bitmap
149
150         pex pexState
151 }
152
153 func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
154         t.iterPeers(func(peer *Peer) {
155                 if peer.peerHasPiece(i) {
156                         count++
157                 }
158         })
159         return
160 }
161
162 func (t *Torrent) decPieceAvailability(i pieceIndex) {
163         if !t.haveInfo() {
164                 return
165         }
166         p := t.piece(i)
167         if p.availability <= 0 {
168                 panic(p.availability)
169         }
170         p.availability--
171 }
172
173 func (t *Torrent) incPieceAvailability(i pieceIndex) {
174         // If we don't the info, this should be reconciled when we do.
175         if t.haveInfo() {
176                 p := t.piece(i)
177                 p.availability++
178         }
179 }
180
181 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
182         return t._readerNowPieces
183 }
184
185 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
186         return t._readerReadaheadPieces
187 }
188
189 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
190         return !t.wantPieceIndex(i)
191 }
192
193 func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
194         return &t._pendingPieces
195 }
196
197 // Returns a channel that is closed when the Torrent is closed.
198 func (t *Torrent) Closed() chansync.Done {
199         return t.closed.Done()
200 }
201
202 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
203 // pending, and half-open peers.
204 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
205         // Add pending peers to the list
206         t.peers.Each(func(peer PeerInfo) {
207                 ks = append(ks, peer)
208         })
209
210         // Add half-open peers to the list
211         for _, peer := range t.halfOpen {
212                 ks = append(ks, peer)
213         }
214
215         // Add active peers to the list
216         for conn := range t.conns {
217
218                 ks = append(ks, PeerInfo{
219                         Id:     conn.PeerID,
220                         Addr:   conn.RemoteAddr,
221                         Source: conn.Discovery,
222                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
223                         // > But if we're not connected to them with an encrypted connection, I couldn't say
224                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
225                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
226                         // > connection. It's probably easiest to do the latter for now.
227                         // https://github.com/anacrolix/torrent/pull/188
228                         SupportsEncryption: conn.headerEncrypted,
229                 })
230         }
231
232         return
233 }
234
235 func (t *Torrent) setChunkSize(size pp.Integer) {
236         t.chunkSize = size
237         t.chunkPool = sync.Pool{
238                 New: func() interface{} {
239                         b := make([]byte, size)
240                         return &b
241                 },
242         }
243 }
244
245 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
246         return t._completedPieces.Contains(bitmap.BitIndex(piece))
247 }
248
249 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
250         return t.pieces[piece].Storage().Completion()
251 }
252
253 // There's a connection to that address already.
254 func (t *Torrent) addrActive(addr string) bool {
255         if _, ok := t.halfOpen[addr]; ok {
256                 return true
257         }
258         for c := range t.conns {
259                 ra := c.RemoteAddr
260                 if ra.String() == addr {
261                         return true
262                 }
263         }
264         return false
265 }
266
267 func (t *Torrent) appendUnclosedConns(ret []*PeerConn) []*PeerConn {
268         for c := range t.conns {
269                 if !c.closed.IsSet() {
270                         ret = append(ret, c)
271                 }
272         }
273         return ret
274 }
275
276 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
277         cl := t.cl
278         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
279         if t.closed.IsSet() {
280                 return false
281         }
282         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
283                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
284                         torrent.Add("peers not added because of bad addr", 1)
285                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
286                         return false
287                 }
288         }
289         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
290                 torrent.Add("peers replaced", 1)
291                 if !replaced.equal(p) {
292                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
293                         added = true
294                 }
295         } else {
296                 added = true
297         }
298         t.openNewConns()
299         for t.peers.Len() > cl.config.TorrentPeersHighWater {
300                 _, ok := t.peers.DeleteMin()
301                 if ok {
302                         torrent.Add("excess reserve peers discarded", 1)
303                 }
304         }
305         return
306 }
307
308 func (t *Torrent) invalidateMetadata() {
309         for i := 0; i < len(t.metadataCompletedChunks); i++ {
310                 t.metadataCompletedChunks[i] = false
311         }
312         t.nameMu.Lock()
313         t.gotMetainfoC = make(chan struct{})
314         t.info = nil
315         t.nameMu.Unlock()
316 }
317
318 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
319         if t.haveInfo() {
320                 return
321         }
322         if index >= len(t.metadataCompletedChunks) {
323                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
324                 return
325         }
326         copy(t.metadataBytes[(1<<14)*index:], data)
327         t.metadataCompletedChunks[index] = true
328 }
329
330 func (t *Torrent) metadataPieceCount() int {
331         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
332 }
333
334 func (t *Torrent) haveMetadataPiece(piece int) bool {
335         if t.haveInfo() {
336                 return (1<<14)*piece < len(t.metadataBytes)
337         } else {
338                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
339         }
340 }
341
342 func (t *Torrent) metadataSize() int {
343         return len(t.metadataBytes)
344 }
345
346 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
347         for i := 0; i < len(info.Pieces); i += sha1.Size {
348                 ret = append(ret, info.Pieces[i:i+sha1.Size])
349         }
350         return
351 }
352
353 func (t *Torrent) makePieces() {
354         hashes := infoPieceHashes(t.info)
355         t.pieces = make([]Piece, len(hashes))
356         for i, hash := range hashes {
357                 piece := &t.pieces[i]
358                 piece.t = t
359                 piece.index = pieceIndex(i)
360                 piece.noPendingWrites.L = &piece.pendingWritesMutex
361                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
362                 files := *t.files
363                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
364                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
365                 piece.files = files[beginFile:endFile]
366         }
367 }
368
369 // Returns the index of the first file containing the piece. files must be
370 // ordered by offset.
371 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
372         for i, f := range files {
373                 if f.offset+f.length > pieceOffset {
374                         return i
375                 }
376         }
377         return 0
378 }
379
380 // Returns the index after the last file containing the piece. files must be
381 // ordered by offset.
382 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
383         for i, f := range files {
384                 if f.offset+f.length >= pieceEndOffset {
385                         return i + 1
386                 }
387         }
388         return 0
389 }
390
391 func (t *Torrent) cacheLength() {
392         var l int64
393         for _, f := range t.info.UpvertedFiles() {
394                 l += f.Length
395         }
396         t.length = &l
397 }
398
399 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
400 // separately.
401 func (t *Torrent) setInfo(info *metainfo.Info) error {
402         if err := validateInfo(info); err != nil {
403                 return fmt.Errorf("bad info: %s", err)
404         }
405         if t.storageOpener != nil {
406                 var err error
407                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
408                 if err != nil {
409                         return fmt.Errorf("error opening torrent storage: %s", err)
410                 }
411         }
412         t.nameMu.Lock()
413         t.info = info
414         t.nameMu.Unlock()
415         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
416         t.displayName = "" // Save a few bytes lol.
417         t.initFiles()
418         t.cacheLength()
419         t.makePieces()
420         return nil
421 }
422
423 // This seems to be all the follow-up tasks after info is set, that can't fail.
424 func (t *Torrent) onSetInfo() {
425         t.iterPeers(func(p *Peer) {
426                 p.onGotInfo(t.info)
427         })
428         for i := range t.pieces {
429                 p := &t.pieces[i]
430                 // Need to add availability before updating piece completion, as that may result in conns
431                 // being dropped.
432                 if p.availability != 0 {
433                         panic(p.availability)
434                 }
435                 p.availability = int64(t.pieceAvailabilityFromPeers(i))
436                 t.updatePieceCompletion(pieceIndex(i))
437                 if !p.storageCompletionOk {
438                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
439                         t.queuePieceCheck(pieceIndex(i))
440                 }
441         }
442         t.cl.event.Broadcast()
443         close(t.gotMetainfoC)
444         t.updateWantPeersEvent()
445         t.pendingRequests = make(map[RequestIndex]int)
446         t.tryCreateMorePieceHashers()
447 }
448
449 // Called when metadata for a torrent becomes available.
450 func (t *Torrent) setInfoBytesLocked(b []byte) error {
451         if metainfo.HashBytes(b) != t.infoHash {
452                 return errors.New("info bytes have wrong hash")
453         }
454         var info metainfo.Info
455         if err := bencode.Unmarshal(b, &info); err != nil {
456                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
457         }
458         t.metadataBytes = b
459         t.metadataCompletedChunks = nil
460         if t.info != nil {
461                 return nil
462         }
463         if err := t.setInfo(&info); err != nil {
464                 return err
465         }
466         t.onSetInfo()
467         return nil
468 }
469
470 func (t *Torrent) haveAllMetadataPieces() bool {
471         if t.haveInfo() {
472                 return true
473         }
474         if t.metadataCompletedChunks == nil {
475                 return false
476         }
477         for _, have := range t.metadataCompletedChunks {
478                 if !have {
479                         return false
480                 }
481         }
482         return true
483 }
484
485 // TODO: Propagate errors to disconnect peer.
486 func (t *Torrent) setMetadataSize(size int) (err error) {
487         if t.haveInfo() {
488                 // We already know the correct metadata size.
489                 return
490         }
491         if uint32(size) > maxMetadataSize {
492                 return errors.New("bad size")
493         }
494         if len(t.metadataBytes) == size {
495                 return
496         }
497         t.metadataBytes = make([]byte, size)
498         t.metadataCompletedChunks = make([]bool, (size+(1<<14)-1)/(1<<14))
499         t.metadataChanged.Broadcast()
500         for c := range t.conns {
501                 c.requestPendingMetadata()
502         }
503         return
504 }
505
506 // The current working name for the torrent. Either the name in the info dict,
507 // or a display name given such as by the dn value in a magnet link, or "".
508 func (t *Torrent) name() string {
509         t.nameMu.RLock()
510         defer t.nameMu.RUnlock()
511         if t.haveInfo() {
512                 return t.info.Name
513         }
514         if t.displayName != "" {
515                 return t.displayName
516         }
517         return "infohash:" + t.infoHash.HexString()
518 }
519
520 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
521         p := &t.pieces[index]
522         ret.Priority = t.piecePriority(index)
523         ret.Completion = p.completion()
524         ret.QueuedForHash = p.queuedForHash()
525         ret.Hashing = p.hashing
526         ret.Checking = ret.QueuedForHash || ret.Hashing
527         ret.Marking = p.marking
528         if !ret.Complete && t.piecePartiallyDownloaded(index) {
529                 ret.Partial = true
530         }
531         return
532 }
533
534 func (t *Torrent) metadataPieceSize(piece int) int {
535         return metadataPieceSize(len(t.metadataBytes), piece)
536 }
537
538 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMetadataRequestMsgType, piece int, data []byte) pp.Message {
539         return pp.Message{
540                 Type:       pp.Extended,
541                 ExtendedID: c.PeerExtensionIDs[pp.ExtensionNameMetadata],
542                 ExtendedPayload: append(bencode.MustMarshal(pp.ExtendedMetadataRequestMsg{
543                         Piece:     piece,
544                         TotalSize: len(t.metadataBytes),
545                         Type:      msgType,
546                 }), data...),
547         }
548 }
549
550 type pieceAvailabilityRun struct {
551         count        pieceIndex
552         availability int64
553 }
554
555 func (me pieceAvailabilityRun) String() string {
556         return fmt.Sprintf("%v(%v)", me.count, me.availability)
557 }
558
559 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
560         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
561                 ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
562         })
563         for i := range t.pieces {
564                 rle.Append(t.pieces[i].availability, 1)
565         }
566         rle.Flush()
567         return
568 }
569
570 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
571         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
572                 ret = append(ret, PieceStateRun{
573                         PieceState: el.(PieceState),
574                         Length:     int(count),
575                 })
576         })
577         for index := range t.pieces {
578                 rle.Append(t.pieceState(pieceIndex(index)), 1)
579         }
580         rle.Flush()
581         return
582 }
583
584 // Produces a small string representing a PieceStateRun.
585 func (psr PieceStateRun) String() (ret string) {
586         ret = fmt.Sprintf("%d", psr.Length)
587         ret += func() string {
588                 switch psr.Priority {
589                 case PiecePriorityNext:
590                         return "N"
591                 case PiecePriorityNormal:
592                         return "."
593                 case PiecePriorityReadahead:
594                         return "R"
595                 case PiecePriorityNow:
596                         return "!"
597                 case PiecePriorityHigh:
598                         return "H"
599                 default:
600                         return ""
601                 }
602         }()
603         if psr.Hashing {
604                 ret += "H"
605         }
606         if psr.QueuedForHash {
607                 ret += "Q"
608         }
609         if psr.Marking {
610                 ret += "M"
611         }
612         if psr.Partial {
613                 ret += "P"
614         }
615         if psr.Complete {
616                 ret += "C"
617         }
618         if !psr.Ok {
619                 ret += "?"
620         }
621         return
622 }
623
624 func (t *Torrent) writeStatus(w io.Writer) {
625         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
626         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
627         if !t.haveInfo() {
628                 fmt.Fprintf(w, "Metadata have: ")
629                 for _, h := range t.metadataCompletedChunks {
630                         fmt.Fprintf(w, "%c", func() rune {
631                                 if h {
632                                         return 'H'
633                                 } else {
634                                         return '.'
635                                 }
636                         }())
637                 }
638                 fmt.Fprintln(w)
639         }
640         fmt.Fprintf(w, "Piece length: %s\n",
641                 func() string {
642                         if t.haveInfo() {
643                                 return fmt.Sprintf("%v (%v chunks)",
644                                         t.usualPieceSize(),
645                                         float64(t.usualPieceSize())/float64(t.chunkSize))
646                         } else {
647                                 return "no info"
648                         }
649                 }(),
650         )
651         if t.info != nil {
652                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
653                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
654                 fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
655                         for _, run := range t.pieceAvailabilityRuns() {
656                                 ret = append(ret, run.String())
657                         }
658                         return
659                 }(), " "))
660         }
661         fmt.Fprintf(w, "Reader Pieces:")
662         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
663                 fmt.Fprintf(w, " %d:%d", begin, end)
664                 return true
665         })
666         fmt.Fprintln(w)
667
668         fmt.Fprintf(w, "Enabled trackers:\n")
669         func() {
670                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
671                 fmt.Fprintf(tw, "    URL\tExtra\n")
672                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
673                         lu := l.URL()
674                         ru := r.URL()
675                         var luns, runs url.URL = *lu, *ru
676                         luns.Scheme = ""
677                         runs.Scheme = ""
678                         var ml missinggo.MultiLess
679                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
680                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
681                         return ml.Less()
682                 }).([]torrentTrackerAnnouncer) {
683                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
684                 }
685                 tw.Flush()
686         }()
687
688         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
689
690         spew.NewDefaultConfig()
691         spew.Fdump(w, t.statsLocked())
692
693         peers := t.peersAsSlice()
694         sort.Slice(peers, func(_i, _j int) bool {
695                 i := peers[_i]
696                 j := peers[_j]
697                 if less, ok := multiless.New().EagerSameLess(
698                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
699                 ).LessOk(); ok {
700                         return less
701                 }
702                 return worseConn(i, j)
703         })
704         for i, c := range peers {
705                 fmt.Fprintf(w, "%2d. ", i+1)
706                 c.writeStatus(w, t)
707         }
708 }
709
710 func (t *Torrent) haveInfo() bool {
711         return t.info != nil
712 }
713
714 // Returns a run-time generated MetaInfo that includes the info bytes and
715 // announce-list as currently known to the client.
716 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
717         return metainfo.MetaInfo{
718                 CreationDate: time.Now().Unix(),
719                 Comment:      "dynamic metainfo from client",
720                 CreatedBy:    "go.torrent",
721                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
722                 InfoBytes: func() []byte {
723                         if t.haveInfo() {
724                                 return t.metadataBytes
725                         } else {
726                                 return nil
727                         }
728                 }(),
729                 UrlList: func() []string {
730                         ret := make([]string, 0, len(t.webSeeds))
731                         for url := range t.webSeeds {
732                                 ret = append(ret, url)
733                         }
734                         return ret
735                 }(),
736         }
737 }
738
739 // Get bytes left
740 func (t *Torrent) BytesMissing() (n int64) {
741         t.cl.rLock()
742         n = t.bytesMissingLocked()
743         t.cl.rUnlock()
744         return
745 }
746
747 func (t *Torrent) bytesMissingLocked() int64 {
748         return t.bytesLeft()
749 }
750
751 func iterFlipped(b *roaring.Bitmap, end uint64, cb func(uint32) bool) {
752         roaring.Flip(b, 0, end).Iterate(cb)
753 }
754
755 func (t *Torrent) bytesLeft() (left int64) {
756         iterFlipped(&t._completedPieces, uint64(t.numPieces()), func(x uint32) bool {
757                 p := t.piece(pieceIndex(x))
758                 left += int64(p.length() - p.numDirtyBytes())
759                 return true
760         })
761         return
762 }
763
764 // Bytes left to give in tracker announces.
765 func (t *Torrent) bytesLeftAnnounce() int64 {
766         if t.haveInfo() {
767                 return t.bytesLeft()
768         } else {
769                 return -1
770         }
771 }
772
773 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
774         if t.pieceComplete(piece) {
775                 return false
776         }
777         if t.pieceAllDirty(piece) {
778                 return false
779         }
780         return t.pieces[piece].hasDirtyChunks()
781 }
782
783 func (t *Torrent) usualPieceSize() int {
784         return int(t.info.PieceLength)
785 }
786
787 func (t *Torrent) numPieces() pieceIndex {
788         return pieceIndex(t.info.NumPieces())
789 }
790
791 func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
792         return pieceIndex(t._completedPieces.GetCardinality())
793 }
794
795 func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
796         t.closed.Set()
797         if t.storage != nil {
798                 wg.Add(1)
799                 go func() {
800                         defer wg.Done()
801                         t.storageLock.Lock()
802                         defer t.storageLock.Unlock()
803                         if f := t.storage.Close; f != nil {
804                                 err1 := f()
805                                 if err1 != nil {
806                                         t.logger.WithDefaultLevel(log.Warning).Printf("error closing storage: %v", err1)
807                                 }
808                         }
809                 }()
810         }
811         t.iterPeers(func(p *Peer) {
812                 p.close()
813         })
814         t.pex.Reset()
815         t.cl.event.Broadcast()
816         t.pieceStateChanges.Close()
817         t.updateWantPeersEvent()
818         return
819 }
820
821 func (t *Torrent) requestOffset(r Request) int64 {
822         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
823 }
824
825 // Return the request that would include the given offset into the torrent data. Returns !ok if
826 // there is no such request.
827 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
828         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
829 }
830
831 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
832         defer perf.ScopeTimerErr(&err)()
833         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
834         if err == nil && n != len(data) {
835                 err = io.ErrShortWrite
836         }
837         return err
838 }
839
840 func (t *Torrent) bitfield() (bf []bool) {
841         bf = make([]bool, t.numPieces())
842         t._completedPieces.Iterate(func(piece uint32) (again bool) {
843                 bf[piece] = true
844                 return true
845         })
846         return
847 }
848
849 func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
850         return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
851 }
852
853 func (t *Torrent) chunksPerRegularPiece() uint32 {
854         return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
855 }
856
857 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
858         t.dirtyChunks.RemoveRange(
859                 uint64(t.pieceRequestIndexOffset(pieceIndex)),
860                 uint64(t.pieceRequestIndexOffset(pieceIndex+1)))
861 }
862
863 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
864         if t.info.PieceLength == 0 {
865                 // There will be no variance amongst pieces. Only pain.
866                 return 0
867         }
868         if piece == t.numPieces()-1 {
869                 ret := pp.Integer(*t.length % t.info.PieceLength)
870                 if ret != 0 {
871                         return ret
872                 }
873         }
874         return pp.Integer(t.info.PieceLength)
875 }
876
877 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
878         p := t.piece(piece)
879         p.waitNoPendingWrites()
880         storagePiece := t.pieces[piece].Storage()
881
882         //Does the backend want to do its own hashing?
883         if i, ok := storagePiece.PieceImpl.(storage.SelfHashing); ok {
884                 var sum metainfo.Hash
885                 //log.Printf("A piece decided to self-hash: %d", piece)
886                 sum, err = i.SelfHash()
887                 missinggo.CopyExact(&ret, sum)
888                 return
889         }
890
891         hash := pieceHash.New()
892         const logPieceContents = false
893         if logPieceContents {
894                 var examineBuf bytes.Buffer
895                 _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
896                 log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
897         } else {
898                 _, err = storagePiece.WriteTo(hash)
899         }
900         missinggo.CopyExact(&ret, hash.Sum(nil))
901         return
902 }
903
904 func (t *Torrent) haveAnyPieces() bool {
905         return t._completedPieces.GetCardinality() != 0
906 }
907
908 func (t *Torrent) haveAllPieces() bool {
909         if !t.haveInfo() {
910                 return false
911         }
912         return t._completedPieces.GetCardinality() == bitmap.BitRange(t.numPieces())
913 }
914
915 func (t *Torrent) havePiece(index pieceIndex) bool {
916         return t.haveInfo() && t.pieceComplete(index)
917 }
918
919 func (t *Torrent) maybeDropMutuallyCompletePeer(
920         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's okay?
921         p *Peer,
922 ) {
923         if !t.cl.config.DropMutuallyCompletePeers {
924                 return
925         }
926         if !t.haveAllPieces() {
927                 return
928         }
929         if all, known := p.peerHasAllPieces(); !(known && all) {
930                 return
931         }
932         if p.useful() {
933                 return
934         }
935         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
936         p.drop()
937 }
938
939 func (t *Torrent) haveChunk(r Request) (ret bool) {
940         // defer func() {
941         //      log.Println("have chunk", r, ret)
942         // }()
943         if !t.haveInfo() {
944                 return false
945         }
946         if t.pieceComplete(pieceIndex(r.Index)) {
947                 return true
948         }
949         p := &t.pieces[r.Index]
950         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
951 }
952
953 func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
954         return chunkIndexType(cs.Begin / chunkSize)
955 }
956
957 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
958         // TODO: Are these overly conservative, should we be guarding this here?
959         {
960                 if !t.haveInfo() {
961                         return false
962                 }
963                 if index < 0 || index >= t.numPieces() {
964                         return false
965                 }
966         }
967         p := &t.pieces[index]
968         if p.queuedForHash() {
969                 return false
970         }
971         if p.hashing {
972                 return false
973         }
974         if t.pieceComplete(index) {
975                 return false
976         }
977         if t._pendingPieces.Contains(int(index)) {
978                 return true
979         }
980         // t.logger.Printf("piece %d not pending", index)
981         return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
982                 return index < begin || index >= end
983         })
984 }
985
986 // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
987 // conns (which is a map).
988 var peerConnSlices sync.Pool
989
990 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
991 // connection is one that usually sends us unwanted pieces, or has been in the worse half of the
992 // established connections for more than a minute. This is O(n log n). If there was a way to not
993 // consider the position of a conn relative to the total number, it could be reduced to O(n).
994 func (t *Torrent) worstBadConn() (ret *PeerConn) {
995         var sl []*PeerConn
996         getInterface := peerConnSlices.Get()
997         if getInterface == nil {
998                 sl = make([]*PeerConn, 0, len(t.conns))
999         } else {
1000                 sl = getInterface.([]*PeerConn)[:0]
1001         }
1002         sl = t.appendUnclosedConns(sl)
1003         defer peerConnSlices.Put(sl)
1004         wcs := worseConnSlice{sl}
1005         heap.Init(&wcs)
1006         for wcs.Len() != 0 {
1007                 c := heap.Pop(&wcs).(*PeerConn)
1008                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
1009                         return c
1010                 }
1011                 // If the connection is in the worst half of the established
1012                 // connection quota and is older than a minute.
1013                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
1014                         // Give connections 1 minute to prove themselves.
1015                         if time.Since(c.completedHandshake) > time.Minute {
1016                                 return c
1017                         }
1018                 }
1019         }
1020         return nil
1021 }
1022
1023 type PieceStateChange struct {
1024         Index int
1025         PieceState
1026 }
1027
1028 func (t *Torrent) publishPieceChange(piece pieceIndex) {
1029         t.cl._mu.Defer(func() {
1030                 cur := t.pieceState(piece)
1031                 p := &t.pieces[piece]
1032                 if cur != p.publicPieceState {
1033                         p.publicPieceState = cur
1034                         t.pieceStateChanges.Publish(PieceStateChange{
1035                                 int(piece),
1036                                 cur,
1037                         })
1038                 }
1039         })
1040 }
1041
1042 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
1043         if t.pieceComplete(piece) {
1044                 return 0
1045         }
1046         return pp.Integer(t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks())
1047 }
1048
1049 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1050         return t.pieces[piece].allChunksDirty()
1051 }
1052
1053 func (t *Torrent) readersChanged() {
1054         t.updateReaderPieces()
1055         t.updateAllPiecePriorities()
1056 }
1057
1058 func (t *Torrent) updateReaderPieces() {
1059         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1060 }
1061
1062 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1063         if from == to {
1064                 return
1065         }
1066         t.updateReaderPieces()
1067         // Order the ranges, high and low.
1068         l, h := from, to
1069         if l.begin > h.begin {
1070                 l, h = h, l
1071         }
1072         if l.end < h.begin {
1073                 // Two distinct ranges.
1074                 t.updatePiecePriorities(l.begin, l.end)
1075                 t.updatePiecePriorities(h.begin, h.end)
1076         } else {
1077                 // Ranges overlap.
1078                 end := l.end
1079                 if h.end > end {
1080                         end = h.end
1081                 }
1082                 t.updatePiecePriorities(l.begin, end)
1083         }
1084 }
1085
1086 func (t *Torrent) maybeNewConns() {
1087         // Tickle the accept routine.
1088         t.cl.event.Broadcast()
1089         t.openNewConns()
1090 }
1091
1092 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
1093         // t.logger.Printf("piece %d priority changed", piece)
1094         t.iterPeers(func(c *Peer) {
1095                 if c.updatePiecePriority(piece) {
1096                         // log.Print("conn piece priority changed")
1097                         c.updateRequests()
1098                 }
1099         })
1100         t.maybeNewConns()
1101         t.publishPieceChange(piece)
1102 }
1103
1104 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
1105         p := &t.pieces[piece]
1106         newPrio := p.uncachedPriority()
1107         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1108         if newPrio == PiecePriorityNone {
1109                 if !t._pendingPieces.Remove(int(piece)) {
1110                         return
1111                 }
1112         } else {
1113                 if !t._pendingPieces.Set(int(piece), newPrio.BitmapPriority()) {
1114                         return
1115                 }
1116         }
1117         t.piecePriorityChanged(piece)
1118 }
1119
1120 func (t *Torrent) updateAllPiecePriorities() {
1121         t.updatePiecePriorities(0, t.numPieces())
1122 }
1123
1124 // Update all piece priorities in one hit. This function should have the same
1125 // output as updatePiecePriority, but across all pieces.
1126 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
1127         for i := begin; i < end; i++ {
1128                 t.updatePiecePriority(i)
1129         }
1130 }
1131
1132 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1133 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1134         if off >= *t.length {
1135                 return
1136         }
1137         if off < 0 {
1138                 size += off
1139                 off = 0
1140         }
1141         if size <= 0 {
1142                 return
1143         }
1144         begin = pieceIndex(off / t.info.PieceLength)
1145         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1146         if end > pieceIndex(t.info.NumPieces()) {
1147                 end = pieceIndex(t.info.NumPieces())
1148         }
1149         return
1150 }
1151
1152 // Returns true if all iterations complete without breaking. Returns the read
1153 // regions for all readers. The reader regions should not be merged as some
1154 // callers depend on this method to enumerate readers.
1155 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1156         for r := range t.readers {
1157                 p := r.pieces
1158                 if p.begin >= p.end {
1159                         continue
1160                 }
1161                 if !f(p.begin, p.end) {
1162                         return false
1163                 }
1164         }
1165         return true
1166 }
1167
1168 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1169         prio, ok := t._pendingPieces.GetPriority(piece)
1170         if !ok {
1171                 return PiecePriorityNone
1172         }
1173         if prio > 0 {
1174                 panic(prio)
1175         }
1176         ret := piecePriority(-prio)
1177         if ret == PiecePriorityNone {
1178                 panic(piece)
1179         }
1180         return ret
1181 }
1182
1183 func (t *Torrent) pendRequest(req RequestIndex) {
1184         t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
1185 }
1186
1187 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1188         t.cl.event.Broadcast()
1189         if t.pieceComplete(piece) {
1190                 t.onPieceCompleted(piece)
1191         } else {
1192                 t.onIncompletePiece(piece)
1193         }
1194         t.updatePiecePriority(piece)
1195 }
1196
1197 func (t *Torrent) numReceivedConns() (ret int) {
1198         for c := range t.conns {
1199                 if c.Discovery == PeerSourceIncoming {
1200                         ret++
1201                 }
1202         }
1203         return
1204 }
1205
1206 func (t *Torrent) maxHalfOpen() int {
1207         // Note that if we somehow exceed the maximum established conns, we want
1208         // the negative value to have an effect.
1209         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1210         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1211         // We want to allow some experimentation with new peers, and to try to
1212         // upset an oversupply of received connections.
1213         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1214 }
1215
1216 func (t *Torrent) openNewConns() (initiated int) {
1217         defer t.updateWantPeersEvent()
1218         for t.peers.Len() != 0 {
1219                 if !t.wantConns() {
1220                         return
1221                 }
1222                 if len(t.halfOpen) >= t.maxHalfOpen() {
1223                         return
1224                 }
1225                 if len(t.cl.dialers) == 0 {
1226                         return
1227                 }
1228                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1229                         return
1230                 }
1231                 p := t.peers.PopMax()
1232                 t.initiateConn(p)
1233                 initiated++
1234         }
1235         return
1236 }
1237
1238 func (t *Torrent) getConnPieceInclination() []int {
1239         _ret := t.connPieceInclinationPool.Get()
1240         if _ret == nil {
1241                 pieceInclinationsNew.Add(1)
1242                 return rand.Perm(int(t.numPieces()))
1243         }
1244         pieceInclinationsReused.Add(1)
1245         return *_ret.(*[]int)
1246 }
1247
1248 func (t *Torrent) putPieceInclination(pi []int) {
1249         t.connPieceInclinationPool.Put(&pi)
1250         pieceInclinationsPut.Add(1)
1251 }
1252
1253 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1254         p := t.piece(piece)
1255         uncached := t.pieceCompleteUncached(piece)
1256         cached := p.completion()
1257         changed := cached != uncached
1258         complete := uncached.Complete
1259         p.storageCompletionOk = uncached.Ok
1260         x := uint32(piece)
1261         if complete {
1262                 t._completedPieces.Add(x)
1263         } else {
1264                 t._completedPieces.Remove(x)
1265         }
1266         if complete && len(p.dirtiers) != 0 {
1267                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1268         }
1269         if changed {
1270                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
1271                 t.pieceCompletionChanged(piece)
1272         }
1273         return changed
1274 }
1275
1276 // Non-blocking read. Client lock is not required.
1277 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1278         for len(b) != 0 {
1279                 p := &t.pieces[off/t.info.PieceLength]
1280                 p.waitNoPendingWrites()
1281                 var n1 int
1282                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1283                 if n1 == 0 {
1284                         break
1285                 }
1286                 off += int64(n1)
1287                 n += n1
1288                 b = b[n1:]
1289         }
1290         return
1291 }
1292
1293 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1294 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1295 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1296 func (t *Torrent) maybeCompleteMetadata() error {
1297         if t.haveInfo() {
1298                 // Nothing to do.
1299                 return nil
1300         }
1301         if !t.haveAllMetadataPieces() {
1302                 // Don't have enough metadata pieces.
1303                 return nil
1304         }
1305         err := t.setInfoBytesLocked(t.metadataBytes)
1306         if err != nil {
1307                 t.invalidateMetadata()
1308                 return fmt.Errorf("error setting info bytes: %s", err)
1309         }
1310         if t.cl.config.Debug {
1311                 t.logger.Printf("%s: got metadata from peers", t)
1312         }
1313         return nil
1314 }
1315
1316 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1317         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1318                 if end > begin {
1319                         now.Add(bitmap.BitIndex(begin))
1320                         readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
1321                 }
1322                 return true
1323         })
1324         return
1325 }
1326
1327 func (t *Torrent) needData() bool {
1328         if t.closed.IsSet() {
1329                 return false
1330         }
1331         if !t.haveInfo() {
1332                 return true
1333         }
1334         return t._pendingPieces.Len() != 0
1335 }
1336
1337 func appendMissingStrings(old, new []string) (ret []string) {
1338         ret = old
1339 new:
1340         for _, n := range new {
1341                 for _, o := range old {
1342                         if o == n {
1343                                 continue new
1344                         }
1345                 }
1346                 ret = append(ret, n)
1347         }
1348         return
1349 }
1350
1351 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1352         ret = existing
1353         for minNumTiers > len(ret) {
1354                 ret = append(ret, nil)
1355         }
1356         return
1357 }
1358
1359 func (t *Torrent) addTrackers(announceList [][]string) {
1360         fullAnnounceList := &t.metainfo.AnnounceList
1361         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1362         for tierIndex, trackerURLs := range announceList {
1363                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1364         }
1365         t.startMissingTrackerScrapers()
1366         t.updateWantPeersEvent()
1367 }
1368
1369 // Don't call this before the info is available.
1370 func (t *Torrent) bytesCompleted() int64 {
1371         if !t.haveInfo() {
1372                 return 0
1373         }
1374         return *t.length - t.bytesLeft()
1375 }
1376
1377 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1378         t.cl.lock()
1379         defer t.cl.unlock()
1380         return t.setInfoBytesLocked(b)
1381 }
1382
1383 // Returns true if connection is removed from torrent.Conns.
1384 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1385         if !c.closed.IsSet() {
1386                 panic("connection is not closed")
1387                 // There are behaviours prevented by the closed state that will fail
1388                 // if the connection has been deleted.
1389         }
1390         _, ret = t.conns[c]
1391         delete(t.conns, c)
1392         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1393         // the drop event against the PexConnState instead.
1394         if ret {
1395                 if !t.cl.config.DisablePEX {
1396                         t.pex.Drop(c)
1397                 }
1398         }
1399         torrent.Add("deleted connections", 1)
1400         c.deleteAllRequests()
1401         if t.numActivePeers() == 0 {
1402                 t.assertNoPendingRequests()
1403         }
1404         return
1405 }
1406
1407 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1408         if !t.haveInfo() {
1409                 return
1410         }
1411         p.newPeerPieces().Iterate(func(i uint32) bool {
1412                 p.t.decPieceAvailability(pieceIndex(i))
1413                 return true
1414         })
1415 }
1416
1417 func (t *Torrent) numActivePeers() (num int) {
1418         t.iterPeers(func(*Peer) {
1419                 num++
1420         })
1421         return
1422 }
1423
1424 func (t *Torrent) assertNoPendingRequests() {
1425         if len(t.pendingRequests) != 0 {
1426                 panic(t.pendingRequests)
1427         }
1428         //if len(t.lastRequested) != 0 {
1429         //      panic(t.lastRequested)
1430         //}
1431 }
1432
1433 func (t *Torrent) dropConnection(c *PeerConn) {
1434         t.cl.event.Broadcast()
1435         c.close()
1436         if t.deletePeerConn(c) {
1437                 t.openNewConns()
1438         }
1439 }
1440
1441 func (t *Torrent) wantPeers() bool {
1442         if t.closed.IsSet() {
1443                 return false
1444         }
1445         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1446                 return false
1447         }
1448         return t.needData() || t.seeding()
1449 }
1450
1451 func (t *Torrent) updateWantPeersEvent() {
1452         if t.wantPeers() {
1453                 t.wantPeersEvent.Set()
1454         } else {
1455                 t.wantPeersEvent.Clear()
1456         }
1457 }
1458
1459 // Returns whether the client should make effort to seed the torrent.
1460 func (t *Torrent) seeding() bool {
1461         cl := t.cl
1462         if t.closed.IsSet() {
1463                 return false
1464         }
1465         if t.dataUploadDisallowed {
1466                 return false
1467         }
1468         if cl.config.NoUpload {
1469                 return false
1470         }
1471         if !cl.config.Seed {
1472                 return false
1473         }
1474         if cl.config.DisableAggressiveUpload && t.needData() {
1475                 return false
1476         }
1477         return true
1478 }
1479
1480 func (t *Torrent) onWebRtcConn(
1481         c datachannel.ReadWriteCloser,
1482         dcc webtorrent.DataChannelContext,
1483 ) {
1484         defer c.Close()
1485         pc, err := t.cl.initiateProtocolHandshakes(
1486                 context.Background(),
1487                 webrtcNetConn{c, dcc},
1488                 t,
1489                 dcc.LocalOffered,
1490                 false,
1491                 webrtcNetAddr{dcc.Remote},
1492                 webrtcNetwork,
1493                 fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
1494         )
1495         if err != nil {
1496                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1497                 return
1498         }
1499         if dcc.LocalOffered {
1500                 pc.Discovery = PeerSourceTracker
1501         } else {
1502                 pc.Discovery = PeerSourceIncoming
1503         }
1504         t.cl.lock()
1505         defer t.cl.unlock()
1506         err = t.cl.runHandshookConn(pc, t)
1507         if err != nil {
1508                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1509         }
1510 }
1511
1512 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1513         err := t.cl.runHandshookConn(pc, t)
1514         if err != nil || logAll {
1515                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1516         }
1517 }
1518
1519 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1520         t.logRunHandshookConn(pc, false, log.Debug)
1521 }
1522
1523 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1524         wtc, release := t.cl.websocketTrackers.Get(u.String())
1525         go func() {
1526                 <-t.closed.Done()
1527                 release()
1528         }()
1529         wst := websocketTrackerStatus{u, wtc}
1530         go func() {
1531                 err := wtc.Announce(tracker.Started, t.infoHash)
1532                 if err != nil {
1533                         t.logger.WithDefaultLevel(log.Warning).Printf(
1534                                 "error in initial announce to %q: %v",
1535                                 u.String(), err,
1536                         )
1537                 }
1538         }()
1539         return wst
1540
1541 }
1542
1543 func (t *Torrent) startScrapingTracker(_url string) {
1544         if _url == "" {
1545                 return
1546         }
1547         u, err := url.Parse(_url)
1548         if err != nil {
1549                 // URLs with a leading '*' appear to be a uTorrent convention to
1550                 // disable trackers.
1551                 if _url[0] != '*' {
1552                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1553                 }
1554                 return
1555         }
1556         if u.Scheme == "udp" {
1557                 u.Scheme = "udp4"
1558                 t.startScrapingTracker(u.String())
1559                 u.Scheme = "udp6"
1560                 t.startScrapingTracker(u.String())
1561                 return
1562         }
1563         if _, ok := t.trackerAnnouncers[_url]; ok {
1564                 return
1565         }
1566         sl := func() torrentTrackerAnnouncer {
1567                 switch u.Scheme {
1568                 case "ws", "wss":
1569                         if t.cl.config.DisableWebtorrent {
1570                                 return nil
1571                         }
1572                         return t.startWebsocketAnnouncer(*u)
1573                 case "udp4":
1574                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1575                                 return nil
1576                         }
1577                 case "udp6":
1578                         if t.cl.config.DisableIPv6 {
1579                                 return nil
1580                         }
1581                 }
1582                 newAnnouncer := &trackerScraper{
1583                         u: *u,
1584                         t: t,
1585                 }
1586                 go newAnnouncer.Run()
1587                 return newAnnouncer
1588         }()
1589         if sl == nil {
1590                 return
1591         }
1592         if t.trackerAnnouncers == nil {
1593                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1594         }
1595         t.trackerAnnouncers[_url] = sl
1596 }
1597
1598 // Adds and starts tracker scrapers for tracker URLs that aren't already
1599 // running.
1600 func (t *Torrent) startMissingTrackerScrapers() {
1601         if t.cl.config.DisableTrackers {
1602                 return
1603         }
1604         t.startScrapingTracker(t.metainfo.Announce)
1605         for _, tier := range t.metainfo.AnnounceList {
1606                 for _, url := range tier {
1607                         t.startScrapingTracker(url)
1608                 }
1609         }
1610 }
1611
1612 // Returns an AnnounceRequest with fields filled out to defaults and current
1613 // values.
1614 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1615         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1616         // dependent on the network in use.
1617         return tracker.AnnounceRequest{
1618                 Event: event,
1619                 NumWant: func() int32 {
1620                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1621                                 return -1
1622                         } else {
1623                                 return 0
1624                         }
1625                 }(),
1626                 Port:     uint16(t.cl.incomingPeerPort()),
1627                 PeerId:   t.cl.peerID,
1628                 InfoHash: t.infoHash,
1629                 Key:      t.cl.announceKey(),
1630
1631                 // The following are vaguely described in BEP 3.
1632
1633                 Left:     t.bytesLeftAnnounce(),
1634                 Uploaded: t.stats.BytesWrittenData.Int64(),
1635                 // There's no mention of wasted or unwanted download in the BEP.
1636                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1637         }
1638 }
1639
1640 // Adds peers revealed in an announce until the announce ends, or we have
1641 // enough peers.
1642 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1643         cl := t.cl
1644         for v := range pvs {
1645                 cl.lock()
1646                 added := 0
1647                 for _, cp := range v.Peers {
1648                         if cp.Port == 0 {
1649                                 // Can't do anything with this.
1650                                 continue
1651                         }
1652                         if t.addPeer(PeerInfo{
1653                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1654                                 Source: PeerSourceDhtGetPeers,
1655                         }) {
1656                                 added++
1657                         }
1658                 }
1659                 cl.unlock()
1660                 // if added != 0 {
1661                 //      log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1662                 // }
1663         }
1664 }
1665
1666 // Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
1667 // announce ends. stop will force the announce to end.
1668 func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
1669         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
1670         if err != nil {
1671                 return
1672         }
1673         _done := make(chan struct{})
1674         done = _done
1675         stop = ps.Close
1676         go func() {
1677                 t.consumeDhtAnnouncePeers(ps.Peers())
1678                 close(_done)
1679         }()
1680         return
1681 }
1682
1683 func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
1684         _, stop, err := t.AnnounceToDht(s)
1685         if err != nil {
1686                 return err
1687         }
1688         select {
1689         case <-t.closed.Done():
1690         case <-time.After(5 * time.Minute):
1691         }
1692         stop()
1693         return nil
1694 }
1695
1696 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1697         cl := t.cl
1698         cl.lock()
1699         defer cl.unlock()
1700         for {
1701                 for {
1702                         if t.closed.IsSet() {
1703                                 return
1704                         }
1705                         if !t.wantPeers() {
1706                                 goto wait
1707                         }
1708                         // TODO: Determine if there's a listener on the port we're announcing.
1709                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1710                                 goto wait
1711                         }
1712                         break
1713                 wait:
1714                         cl.event.Wait()
1715                 }
1716                 func() {
1717                         t.numDHTAnnounces++
1718                         cl.unlock()
1719                         defer cl.lock()
1720                         err := t.timeboxedAnnounceToDht(s)
1721                         if err != nil {
1722                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1723                         }
1724                 }()
1725         }
1726 }
1727
1728 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1729         for _, p := range peers {
1730                 if t.addPeer(p) {
1731                         added++
1732                 }
1733         }
1734         return
1735 }
1736
1737 // The returned TorrentStats may require alignment in memory. See
1738 // https://github.com/anacrolix/torrent/issues/383.
1739 func (t *Torrent) Stats() TorrentStats {
1740         t.cl.rLock()
1741         defer t.cl.rUnlock()
1742         return t.statsLocked()
1743 }
1744
1745 func (t *Torrent) statsLocked() (ret TorrentStats) {
1746         ret.ActivePeers = len(t.conns)
1747         ret.HalfOpenPeers = len(t.halfOpen)
1748         ret.PendingPeers = t.peers.Len()
1749         ret.TotalPeers = t.numTotalPeers()
1750         ret.ConnectedSeeders = 0
1751         for c := range t.conns {
1752                 if all, ok := c.peerHasAllPieces(); all && ok {
1753                         ret.ConnectedSeeders++
1754                 }
1755         }
1756         ret.ConnStats = t.stats.Copy()
1757         return
1758 }
1759
1760 // The total number of peers in the torrent.
1761 func (t *Torrent) numTotalPeers() int {
1762         peers := make(map[string]struct{})
1763         for conn := range t.conns {
1764                 ra := conn.conn.RemoteAddr()
1765                 if ra == nil {
1766                         // It's been closed and doesn't support RemoteAddr.
1767                         continue
1768                 }
1769                 peers[ra.String()] = struct{}{}
1770         }
1771         for addr := range t.halfOpen {
1772                 peers[addr] = struct{}{}
1773         }
1774         t.peers.Each(func(peer PeerInfo) {
1775                 peers[peer.Addr.String()] = struct{}{}
1776         })
1777         return len(peers)
1778 }
1779
1780 // Reconcile bytes transferred before connection was associated with a
1781 // torrent.
1782 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1783         if c._stats != (ConnStats{
1784                 // Handshakes should only increment these fields:
1785                 BytesWritten: c._stats.BytesWritten,
1786                 BytesRead:    c._stats.BytesRead,
1787         }) {
1788                 panic("bad stats")
1789         }
1790         c.postHandshakeStats(func(cs *ConnStats) {
1791                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1792                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1793         })
1794         c.reconciledHandshakeStats = true
1795 }
1796
1797 // Returns true if the connection is added.
1798 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1799         defer func() {
1800                 if err == nil {
1801                         torrent.Add("added connections", 1)
1802                 }
1803         }()
1804         if t.closed.IsSet() {
1805                 return errors.New("torrent closed")
1806         }
1807         for c0 := range t.conns {
1808                 if c.PeerID != c0.PeerID {
1809                         continue
1810                 }
1811                 if !t.cl.config.DropDuplicatePeerIds {
1812                         continue
1813                 }
1814                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1815                         c0.close()
1816                         t.deletePeerConn(c0)
1817                 } else {
1818                         return errors.New("existing connection preferred")
1819                 }
1820         }
1821         if len(t.conns) >= t.maxEstablishedConns {
1822                 c := t.worstBadConn()
1823                 if c == nil {
1824                         return errors.New("don't want conns")
1825                 }
1826                 c.close()
1827                 t.deletePeerConn(c)
1828         }
1829         if len(t.conns) >= t.maxEstablishedConns {
1830                 panic(len(t.conns))
1831         }
1832         t.conns[c] = struct{}{}
1833         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1834                 t.pex.Add(c) // as no further extended handshake expected
1835         }
1836         return nil
1837 }
1838
1839 func (t *Torrent) wantConns() bool {
1840         if !t.networkingEnabled.Bool() {
1841                 return false
1842         }
1843         if t.closed.IsSet() {
1844                 return false
1845         }
1846         if !t.seeding() && !t.needData() {
1847                 return false
1848         }
1849         if len(t.conns) < t.maxEstablishedConns {
1850                 return true
1851         }
1852         return t.worstBadConn() != nil
1853 }
1854
1855 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1856         t.cl.lock()
1857         defer t.cl.unlock()
1858         oldMax = t.maxEstablishedConns
1859         t.maxEstablishedConns = max
1860         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
1861                 return worseConn(&l.Peer, &r.Peer)
1862         })
1863         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1864                 t.dropConnection(wcs.Pop().(*PeerConn))
1865         }
1866         t.openNewConns()
1867         return oldMax
1868 }
1869
1870 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1871         t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
1872         p := t.piece(piece)
1873         p.numVerifies++
1874         t.cl.event.Broadcast()
1875         if t.closed.IsSet() {
1876                 return
1877         }
1878
1879         // Don't score the first time a piece is hashed, it could be an initial check.
1880         if p.storageCompletionOk {
1881                 if passed {
1882                         pieceHashedCorrect.Add(1)
1883                 } else {
1884                         log.Fmsg(
1885                                 "piece %d failed hash: %d connections contributed", piece, len(p.dirtiers),
1886                         ).AddValues(t, p).SetLevel(log.Debug).Log(t.logger)
1887                         pieceHashedNotCorrect.Add(1)
1888                 }
1889         }
1890
1891         p.marking = true
1892         t.publishPieceChange(piece)
1893         defer func() {
1894                 p.marking = false
1895                 t.publishPieceChange(piece)
1896         }()
1897
1898         if passed {
1899                 if len(p.dirtiers) != 0 {
1900                         // Don't increment stats above connection-level for every involved connection.
1901                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1902                 }
1903                 for c := range p.dirtiers {
1904                         c._stats.incrementPiecesDirtiedGood()
1905                 }
1906                 t.clearPieceTouchers(piece)
1907                 t.cl.unlock()
1908                 err := p.Storage().MarkComplete()
1909                 if err != nil {
1910                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1911                 }
1912                 t.cl.lock()
1913
1914                 if t.closed.IsSet() {
1915                         return
1916                 }
1917                 t.pendAllChunkSpecs(piece)
1918         } else {
1919                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
1920                         // Peers contributed to all the data for this piece hash failure, and the failure was
1921                         // not due to errors in the storage (such as data being dropped in a cache).
1922
1923                         // Increment Torrent and above stats, and then specific connections.
1924                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1925                         for c := range p.dirtiers {
1926                                 // Y u do dis peer?!
1927                                 c.stats().incrementPiecesDirtiedBad()
1928                         }
1929
1930                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
1931                         for c := range p.dirtiers {
1932                                 if !c.trusted {
1933                                         bannableTouchers = append(bannableTouchers, c)
1934                                 }
1935                         }
1936                         t.clearPieceTouchers(piece)
1937                         slices.Sort(bannableTouchers, connLessTrusted)
1938
1939                         if t.cl.config.Debug {
1940                                 t.logger.Printf(
1941                                         "bannable conns by trust for piece %d: %v",
1942                                         piece,
1943                                         func() (ret []connectionTrust) {
1944                                                 for _, c := range bannableTouchers {
1945                                                         ret = append(ret, c.trust())
1946                                                 }
1947                                                 return
1948                                         }(),
1949                                 )
1950                         }
1951
1952                         if len(bannableTouchers) >= 1 {
1953                                 c := bannableTouchers[0]
1954                                 t.cl.banPeerIP(c.remoteIp())
1955                                 c.drop()
1956                         }
1957                 }
1958                 t.onIncompletePiece(piece)
1959                 p.Storage().MarkNotComplete()
1960         }
1961         t.updatePieceCompletion(piece)
1962 }
1963
1964 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1965         // TODO: Make faster
1966         for cn := range t.conns {
1967                 cn.tickleWriter()
1968         }
1969 }
1970
1971 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1972         t.pendAllChunkSpecs(piece)
1973         t.cancelRequestsForPiece(piece)
1974         t.piece(piece).readerCond.Broadcast()
1975         for conn := range t.conns {
1976                 conn.have(piece)
1977                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
1978         }
1979 }
1980
1981 // Called when a piece is found to be not complete.
1982 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1983         if t.pieceAllDirty(piece) {
1984                 t.pendAllChunkSpecs(piece)
1985         }
1986         if !t.wantPieceIndex(piece) {
1987                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1988                 return
1989         }
1990         // We could drop any connections that we told we have a piece that we
1991         // don't here. But there's a test failure, and it seems clients don't care
1992         // if you request pieces that you already claim to have. Pruning bad
1993         // connections might just remove any connections that aren't treating us
1994         // favourably anyway.
1995
1996         // for c := range t.conns {
1997         //      if c.sentHave(piece) {
1998         //              c.drop()
1999         //      }
2000         // }
2001         t.iterPeers(func(conn *Peer) {
2002                 if conn.peerHasPiece(piece) {
2003                         conn.updateRequests()
2004                 }
2005         })
2006 }
2007
2008 func (t *Torrent) tryCreateMorePieceHashers() {
2009         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
2010         }
2011 }
2012
2013 func (t *Torrent) tryCreatePieceHasher() bool {
2014         if t.storage == nil {
2015                 return false
2016         }
2017         pi, ok := t.getPieceToHash()
2018         if !ok {
2019                 return false
2020         }
2021         p := t.piece(pi)
2022         t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
2023         p.hashing = true
2024         t.publishPieceChange(pi)
2025         t.updatePiecePriority(pi)
2026         t.storageLock.RLock()
2027         t.activePieceHashes++
2028         go t.pieceHasher(pi)
2029         return true
2030 }
2031
2032 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
2033         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
2034                 if t.piece(i).hashing {
2035                         return true
2036                 }
2037                 ret = i
2038                 ok = true
2039                 return false
2040         })
2041         return
2042 }
2043
2044 func (t *Torrent) pieceHasher(index pieceIndex) {
2045         p := t.piece(index)
2046         sum, copyErr := t.hashPiece(index)
2047         correct := sum == *p.hash
2048         switch copyErr {
2049         case nil, io.EOF:
2050         default:
2051                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
2052         }
2053         t.storageLock.RUnlock()
2054         t.cl.lock()
2055         defer t.cl.unlock()
2056         p.hashing = false
2057         t.updatePiecePriority(index)
2058         t.pieceHashed(index, correct, copyErr)
2059         t.publishPieceChange(index)
2060         t.activePieceHashes--
2061         t.tryCreateMorePieceHashers()
2062 }
2063
2064 // Return the connections that touched a piece, and clear the entries while doing it.
2065 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
2066         p := t.piece(pi)
2067         for c := range p.dirtiers {
2068                 delete(c.peerTouchedPieces, pi)
2069                 delete(p.dirtiers, c)
2070         }
2071 }
2072
2073 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2074         t.iterPeers(func(p *Peer) {
2075                 ret = append(ret, p)
2076         })
2077         return
2078 }
2079
2080 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2081         piece := t.piece(pieceIndex)
2082         if piece.queuedForHash() {
2083                 return
2084         }
2085         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2086         t.publishPieceChange(pieceIndex)
2087         t.updatePiecePriority(pieceIndex)
2088         t.tryCreateMorePieceHashers()
2089 }
2090
2091 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2092 // before the Info is available.
2093 func (t *Torrent) VerifyData() {
2094         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2095                 t.Piece(i).VerifyData()
2096         }
2097 }
2098
2099 // Start the process of connecting to the given peer for the given torrent if appropriate.
2100 func (t *Torrent) initiateConn(peer PeerInfo) {
2101         if peer.Id == t.cl.peerID {
2102                 return
2103         }
2104         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2105                 return
2106         }
2107         addr := peer.Addr
2108         if t.addrActive(addr.String()) {
2109                 return
2110         }
2111         t.cl.numHalfOpen++
2112         t.halfOpen[addr.String()] = peer
2113         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2114 }
2115
2116 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2117 // quickly make one Client visible to the Torrent of another Client.
2118 func (t *Torrent) AddClientPeer(cl *Client) int {
2119         return t.AddPeers(func() (ps []PeerInfo) {
2120                 for _, la := range cl.ListenAddrs() {
2121                         ps = append(ps, PeerInfo{
2122                                 Addr:    la,
2123                                 Trusted: true,
2124                         })
2125                 }
2126                 return
2127         }())
2128 }
2129
2130 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2131 // connection.
2132 func (t *Torrent) allStats(f func(*ConnStats)) {
2133         f(&t.stats)
2134         f(&t.cl.stats)
2135 }
2136
2137 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2138         return t.pieces[i].hashing
2139 }
2140
2141 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2142         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2143 }
2144
2145 func (t *Torrent) dialTimeout() time.Duration {
2146         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2147 }
2148
2149 func (t *Torrent) piece(i int) *Piece {
2150         return &t.pieces[i]
2151 }
2152
2153 func (t *Torrent) onWriteChunkErr(err error) {
2154         if t.userOnWriteChunkErr != nil {
2155                 go t.userOnWriteChunkErr(err)
2156                 return
2157         }
2158         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2159         t.disallowDataDownloadLocked()
2160 }
2161
2162 func (t *Torrent) DisallowDataDownload() {
2163         t.disallowDataDownloadLocked()
2164 }
2165
2166 func (t *Torrent) disallowDataDownloadLocked() {
2167         t.dataDownloadDisallowed.Set()
2168 }
2169
2170 func (t *Torrent) AllowDataDownload() {
2171         t.dataDownloadDisallowed.Clear()
2172 }
2173
2174 // Enables uploading data, if it was disabled.
2175 func (t *Torrent) AllowDataUpload() {
2176         t.cl.lock()
2177         defer t.cl.unlock()
2178         t.dataUploadDisallowed = false
2179         for c := range t.conns {
2180                 c.updateRequests()
2181         }
2182 }
2183
2184 // Disables uploading data, if it was enabled.
2185 func (t *Torrent) DisallowDataUpload() {
2186         t.cl.lock()
2187         defer t.cl.unlock()
2188         t.dataUploadDisallowed = true
2189         for c := range t.conns {
2190                 c.updateRequests()
2191         }
2192 }
2193
2194 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2195 // or if nil, a critical message is logged, and data download is disabled.
2196 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2197         t.cl.lock()
2198         defer t.cl.unlock()
2199         t.userOnWriteChunkErr = f
2200 }
2201
2202 func (t *Torrent) iterPeers(f func(p *Peer)) {
2203         for pc := range t.conns {
2204                 f(&pc.Peer)
2205         }
2206         for _, ws := range t.webSeeds {
2207                 f(ws)
2208         }
2209 }
2210
2211 func (t *Torrent) callbacks() *Callbacks {
2212         return &t.cl.config.Callbacks
2213 }
2214
2215 var WebseedHttpClient = &http.Client{
2216         Transport: &http.Transport{
2217                 MaxConnsPerHost: 10,
2218         },
2219 }
2220
2221 func (t *Torrent) addWebSeed(url string) {
2222         if t.cl.config.DisableWebseeds {
2223                 return
2224         }
2225         if _, ok := t.webSeeds[url]; ok {
2226                 return
2227         }
2228         const maxRequests = 10
2229         ws := webseedPeer{
2230                 peer: Peer{
2231                         t:                        t,
2232                         outgoing:                 true,
2233                         Network:                  "http",
2234                         reconciledHandshakeStats: true,
2235                         // TODO: Raise this limit, and instead limit concurrent fetches.
2236                         PeerMaxRequests: 32,
2237                         RemoteAddr:      remoteAddrFromUrl(url),
2238                         callbacks:       t.callbacks(),
2239                 },
2240                 client: webseed.Client{
2241                         // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
2242                         HttpClient: WebseedHttpClient,
2243                         Url:        url,
2244                 },
2245                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2246         }
2247         ws.requesterCond.L = t.cl.locker()
2248         for i := 0; i < maxRequests; i += 1 {
2249                 go ws.requester()
2250         }
2251         for _, f := range t.callbacks().NewPeer {
2252                 f(&ws.peer)
2253         }
2254         ws.peer.logger = t.logger.WithContextValue(&ws)
2255         ws.peer.peerImpl = &ws
2256         if t.haveInfo() {
2257                 ws.onGotInfo(t.info)
2258         }
2259         t.webSeeds[url] = &ws.peer
2260         ws.peer.onPeerHasAllPieces()
2261 }
2262
2263 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2264         t.iterPeers(func(p1 *Peer) {
2265                 if p1 == p {
2266                         active = true
2267                 }
2268         })
2269         return
2270 }
2271
2272 func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
2273         index := ri / t.chunksPerRegularPiece()
2274         return Request{
2275                 pp.Integer(index),
2276                 t.piece(int(index)).chunkIndexSpec(ri % t.chunksPerRegularPiece()),
2277         }
2278 }
2279
2280 func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
2281         return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize)
2282 }
2283
2284 func (t *Torrent) numChunks() RequestIndex {
2285         return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize))
2286 }
2287
2288 func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
2289         return RequestIndex(piece) * t.chunksPerRegularPiece()
2290 }