]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Forgot to remove initial have all state for webseeds
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "bytes"
5         "container/heap"
6         "context"
7         "crypto/sha1"
8         "errors"
9         "fmt"
10         "io"
11         "math/rand"
12         "net/http"
13         "net/url"
14         "sort"
15         "sync"
16         "text/tabwriter"
17         "time"
18         "unsafe"
19
20         "github.com/anacrolix/multiless"
21         "github.com/anacrolix/torrent/common"
22         "github.com/anacrolix/torrent/segments"
23         "github.com/anacrolix/torrent/webseed"
24         "github.com/davecgh/go-spew/spew"
25         "github.com/pion/datachannel"
26
27         "github.com/anacrolix/dht/v2"
28         "github.com/anacrolix/log"
29         "github.com/anacrolix/missinggo"
30         "github.com/anacrolix/missinggo/iter"
31         "github.com/anacrolix/missinggo/perf"
32         "github.com/anacrolix/missinggo/pubsub"
33         "github.com/anacrolix/missinggo/slices"
34         "github.com/anacrolix/missinggo/v2/bitmap"
35         "github.com/anacrolix/missinggo/v2/prioritybitmap"
36
37         "github.com/anacrolix/torrent/bencode"
38         "github.com/anacrolix/torrent/metainfo"
39         pp "github.com/anacrolix/torrent/peer_protocol"
40         "github.com/anacrolix/torrent/storage"
41         "github.com/anacrolix/torrent/tracker"
42         "github.com/anacrolix/torrent/webtorrent"
43 )
44
45 // Maintains state of torrent within a Client. Many methods should not be called before the info is
46 // available, see .Info and .GotInfo.
47 type Torrent struct {
48         // Torrent-level aggregate statistics. First in struct to ensure 64-bit
49         // alignment. See #262.
50         stats  ConnStats
51         cl     *Client
52         logger log.Logger
53
54         networkingEnabled      bool
55         dataDownloadDisallowed bool
56         dataUploadDisallowed   bool
57         userOnWriteChunkErr    func(error)
58
59         closed   missinggo.Event
60         infoHash metainfo.Hash
61         pieces   []Piece
62         // Values are the piece indices that changed.
63         pieceStateChanges *pubsub.PubSub
64         // The size of chunks to request from peers over the wire. This is
65         // normally 16KiB by convention these days.
66         chunkSize pp.Integer
67         chunkPool *sync.Pool
68         // Total length of the torrent in bytes. Stored because it's not O(1) to
69         // get this from the info dict.
70         length *int64
71
72         // The storage to open when the info dict becomes available.
73         storageOpener *storage.Client
74         // Storage for torrent data.
75         storage *storage.Torrent
76         // Read-locked for using storage, and write-locked for Closing.
77         storageLock sync.RWMutex
78
79         // TODO: Only announce stuff is used?
80         metainfo metainfo.MetaInfo
81
82         // The info dict. nil if we don't have it (yet).
83         info      *metainfo.Info
84         fileIndex segments.Index
85         files     *[]*File
86
87         webSeeds map[string]*Peer
88
89         // Active peer connections, running message stream loops. TODO: Make this
90         // open (not-closed) connections only.
91         conns               map[*PeerConn]struct{}
92         maxEstablishedConns int
93         // Set of addrs to which we're attempting to connect. Connections are
94         // half-open until all handshakes are completed.
95         halfOpen map[string]PeerInfo
96
97         // Reserve of peers to connect to. A peer can be both here and in the
98         // active connections if were told about the peer after connecting with
99         // them. That encourages us to reconnect to peers that are well known in
100         // the swarm.
101         peers prioritizedPeers
102         // Whether we want to know to know more peers.
103         wantPeersEvent missinggo.Event
104         // An announcer for each tracker URL.
105         trackerAnnouncers map[string]torrentTrackerAnnouncer
106         // How many times we've initiated a DHT announce. TODO: Move into stats.
107         numDHTAnnounces int
108
109         // Name used if the info name isn't available. Should be cleared when the
110         // Info does become available.
111         nameMu      sync.RWMutex
112         displayName string
113
114         // The bencoded bytes of the info dict. This is actively manipulated if
115         // the info bytes aren't initially available, and we try to fetch them
116         // from peers.
117         metadataBytes []byte
118         // Each element corresponds to the 16KiB metadata pieces. If true, we have
119         // received that piece.
120         metadataCompletedChunks []bool
121         metadataChanged         sync.Cond
122
123         // Set when .Info is obtained.
124         gotMetainfo missinggo.Event
125
126         readers                map[*reader]struct{}
127         _readerNowPieces       bitmap.Bitmap
128         _readerReadaheadPieces bitmap.Bitmap
129
130         // A cache of pieces we need to get. Calculated from various piece and
131         // file priorities and completion states elsewhere.
132         _pendingPieces prioritybitmap.PriorityBitmap
133         // A cache of completed piece indices.
134         _completedPieces bitmap.Bitmap
135         // Pieces that need to be hashed.
136         piecesQueuedForHash bitmap.Bitmap
137         activePieceHashes   int
138
139         // A pool of piece priorities []int for assignment to new connections.
140         // These "inclinations" are used to give connections preference for
141         // different pieces.
142         connPieceInclinationPool sync.Pool
143
144         // Count of each request across active connections.
145         pendingRequests map[Request]int
146
147         pex pexState
148 }
149
150 func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
151         t.iterPeers(func(peer *Peer) {
152                 if peer.peerHasPiece(i) {
153                         count++
154                 }
155         })
156         return
157 }
158
159 func (t *Torrent) decPieceAvailability(i pieceIndex) {
160         p := t.piece(i)
161         if p.availability <= 0 {
162                 panic(p.availability)
163         }
164         p.availability--
165 }
166
167 func (t *Torrent) incPieceAvailability(i pieceIndex) {
168         p := t.piece(i)
169         p.availability++
170 }
171
172 func (t *Torrent) numConns() int {
173         return len(t.conns)
174 }
175
176 func (t *Torrent) numReaders() int {
177         return len(t.readers)
178 }
179
180 func (t *Torrent) readerNowPieces() bitmap.Bitmap {
181         return t._readerNowPieces
182 }
183
184 func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
185         return t._readerReadaheadPieces
186 }
187
188 func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
189         return !t.wantPieceIndex(i)
190 }
191
192 func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
193         return &t._pendingPieces
194 }
195
196 func (t *Torrent) tickleReaders() {
197         t.cl.event.Broadcast()
198 }
199
200 // Returns a channel that is closed when the Torrent is closed.
201 func (t *Torrent) Closed() <-chan struct{} {
202         return t.closed.LockedChan(t.cl.locker())
203 }
204
205 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
206 // pending, and half-open peers.
207 func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
208         // Add pending peers to the list
209         t.peers.Each(func(peer PeerInfo) {
210                 ks = append(ks, peer)
211         })
212
213         // Add half-open peers to the list
214         for _, peer := range t.halfOpen {
215                 ks = append(ks, peer)
216         }
217
218         // Add active peers to the list
219         for conn := range t.conns {
220
221                 ks = append(ks, PeerInfo{
222                         Id:     conn.PeerID,
223                         Addr:   conn.RemoteAddr,
224                         Source: conn.Discovery,
225                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
226                         // > But if we're not connected to them with an encrypted connection, I couldn't say
227                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
228                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
229                         // > connection. It's probably easiest to do the latter for now.
230                         // https://github.com/anacrolix/torrent/pull/188
231                         SupportsEncryption: conn.headerEncrypted,
232                 })
233         }
234
235         return
236 }
237
238 func (t *Torrent) setChunkSize(size pp.Integer) {
239         t.chunkSize = size
240         t.chunkPool = &sync.Pool{
241                 New: func() interface{} {
242                         b := make([]byte, size)
243                         return &b
244                 },
245         }
246 }
247
248 func (t *Torrent) pieceComplete(piece pieceIndex) bool {
249         return t._completedPieces.Get(bitmap.BitIndex(piece))
250 }
251
252 func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion {
253         return t.pieces[piece].Storage().Completion()
254 }
255
256 // There's a connection to that address already.
257 func (t *Torrent) addrActive(addr string) bool {
258         if _, ok := t.halfOpen[addr]; ok {
259                 return true
260         }
261         for c := range t.conns {
262                 ra := c.RemoteAddr
263                 if ra.String() == addr {
264                         return true
265                 }
266         }
267         return false
268 }
269
270 func (t *Torrent) unclosedConnsAsSlice() (ret []*PeerConn) {
271         ret = make([]*PeerConn, 0, len(t.conns))
272         for c := range t.conns {
273                 if !c.closed.IsSet() {
274                         ret = append(ret, c)
275                 }
276         }
277         return
278 }
279
280 func (t *Torrent) addPeer(p PeerInfo) (added bool) {
281         cl := t.cl
282         torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
283         if t.closed.IsSet() {
284                 return false
285         }
286         if ipAddr, ok := tryIpPortFromNetAddr(p.Addr); ok {
287                 if cl.badPeerIPPort(ipAddr.IP, ipAddr.Port) {
288                         torrent.Add("peers not added because of bad addr", 1)
289                         // cl.logger.Printf("peers not added because of bad addr: %v", p)
290                         return false
291                 }
292         }
293         if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
294                 torrent.Add("peers replaced", 1)
295                 if !replaced.equal(p) {
296                         t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
297                         added = true
298                 }
299         } else {
300                 added = true
301         }
302         t.openNewConns()
303         for t.peers.Len() > cl.config.TorrentPeersHighWater {
304                 _, ok := t.peers.DeleteMin()
305                 if ok {
306                         torrent.Add("excess reserve peers discarded", 1)
307                 }
308         }
309         return
310 }
311
312 func (t *Torrent) invalidateMetadata() {
313         for i := range t.metadataCompletedChunks {
314                 t.metadataCompletedChunks[i] = false
315         }
316         t.nameMu.Lock()
317         t.info = nil
318         t.nameMu.Unlock()
319 }
320
321 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
322         if t.haveInfo() {
323                 return
324         }
325         if index >= len(t.metadataCompletedChunks) {
326                 t.logger.Printf("%s: ignoring metadata piece %d", t, index)
327                 return
328         }
329         copy(t.metadataBytes[(1<<14)*index:], data)
330         t.metadataCompletedChunks[index] = true
331 }
332
333 func (t *Torrent) metadataPieceCount() int {
334         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
335 }
336
337 func (t *Torrent) haveMetadataPiece(piece int) bool {
338         if t.haveInfo() {
339                 return (1<<14)*piece < len(t.metadataBytes)
340         } else {
341                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
342         }
343 }
344
345 func (t *Torrent) metadataSize() int {
346         return len(t.metadataBytes)
347 }
348
349 func infoPieceHashes(info *metainfo.Info) (ret [][]byte) {
350         for i := 0; i < len(info.Pieces); i += sha1.Size {
351                 ret = append(ret, info.Pieces[i:i+sha1.Size])
352         }
353         return
354 }
355
356 func (t *Torrent) makePieces() {
357         hashes := infoPieceHashes(t.info)
358         t.pieces = make([]Piece, len(hashes))
359         for i, hash := range hashes {
360                 piece := &t.pieces[i]
361                 piece.t = t
362                 piece.index = pieceIndex(i)
363                 piece.noPendingWrites.L = &piece.pendingWritesMutex
364                 piece.hash = (*metainfo.Hash)(unsafe.Pointer(&hash[0]))
365                 files := *t.files
366                 beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
367                 endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
368                 piece.files = files[beginFile:endFile]
369         }
370 }
371
372 // Returns the index of the first file containing the piece. files must be
373 // ordered by offset.
374 func pieceFirstFileIndex(pieceOffset int64, files []*File) int {
375         for i, f := range files {
376                 if f.offset+f.length > pieceOffset {
377                         return i
378                 }
379         }
380         return 0
381 }
382
383 // Returns the index after the last file containing the piece. files must be
384 // ordered by offset.
385 func pieceEndFileIndex(pieceEndOffset int64, files []*File) int {
386         for i, f := range files {
387                 if f.offset+f.length >= pieceEndOffset {
388                         return i + 1
389                 }
390         }
391         return 0
392 }
393
394 func (t *Torrent) cacheLength() {
395         var l int64
396         for _, f := range t.info.UpvertedFiles() {
397                 l += f.Length
398         }
399         t.length = &l
400 }
401
402 // TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
403 // separately.
404 func (t *Torrent) setInfo(info *metainfo.Info) error {
405         if err := validateInfo(info); err != nil {
406                 return fmt.Errorf("bad info: %s", err)
407         }
408         if t.storageOpener != nil {
409                 var err error
410                 t.storage, err = t.storageOpener.OpenTorrent(info, t.infoHash)
411                 if err != nil {
412                         return fmt.Errorf("error opening torrent storage: %s", err)
413                 }
414         }
415         t.nameMu.Lock()
416         t.info = info
417         t.nameMu.Unlock()
418         t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
419         t.displayName = "" // Save a few bytes lol.
420         t.initFiles()
421         t.cacheLength()
422         t.makePieces()
423         return nil
424 }
425
426 // This seems to be all the follow-up tasks after info is set, that can't fail.
427 func (t *Torrent) onSetInfo() {
428         t.iterPeers(func(p *Peer) {
429                 p.onGotInfo(t.info)
430         })
431         for i := range t.pieces {
432                 t.updatePieceCompletion(pieceIndex(i))
433                 p := &t.pieces[i]
434                 if !p.storageCompletionOk {
435                         // t.logger.Printf("piece %s completion unknown, queueing check", p)
436                         t.queuePieceCheck(pieceIndex(i))
437                 }
438                 if p.availability != 0 {
439                         panic(p.availability)
440                 }
441                 p.availability = int64(t.pieceAvailabilityFromPeers(i))
442         }
443         t.cl.event.Broadcast()
444         t.gotMetainfo.Set()
445         t.updateWantPeersEvent()
446         t.pendingRequests = make(map[Request]int)
447         t.tryCreateMorePieceHashers()
448 }
449
450 // Called when metadata for a torrent becomes available.
451 func (t *Torrent) setInfoBytes(b []byte) error {
452         if metainfo.HashBytes(b) != t.infoHash {
453                 return errors.New("info bytes have wrong hash")
454         }
455         var info metainfo.Info
456         if err := bencode.Unmarshal(b, &info); err != nil {
457                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
458         }
459         t.metadataBytes = b
460         t.metadataCompletedChunks = nil
461         if t.info != nil {
462                 return nil
463         }
464         if err := t.setInfo(&info); err != nil {
465                 return err
466         }
467         t.onSetInfo()
468         return nil
469 }
470
471 func (t *Torrent) haveAllMetadataPieces() bool {
472         if t.haveInfo() {
473                 return true
474         }
475         if t.metadataCompletedChunks == nil {
476                 return false
477         }
478         for _, have := range t.metadataCompletedChunks {
479                 if !have {
480                         return false
481                 }
482         }
483         return true
484 }
485
486 // TODO: Propagate errors to disconnect peer.
487 func (t *Torrent) setMetadataSize(bytes int) (err error) {
488         if t.haveInfo() {
489                 // We already know the correct metadata size.
490                 return
491         }
492         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
493                 return errors.New("bad size")
494         }
495         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
496                 return
497         }
498         t.metadataBytes = make([]byte, bytes)
499         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
500         t.metadataChanged.Broadcast()
501         for c := range t.conns {
502                 c.requestPendingMetadata()
503         }
504         return
505 }
506
507 // The current working name for the torrent. Either the name in the info dict,
508 // or a display name given such as by the dn value in a magnet link, or "".
509 func (t *Torrent) name() string {
510         t.nameMu.RLock()
511         defer t.nameMu.RUnlock()
512         if t.haveInfo() {
513                 return t.info.Name
514         }
515         if t.displayName != "" {
516                 return t.displayName
517         }
518         return "infohash:" + t.infoHash.HexString()
519 }
520
521 func (t *Torrent) pieceState(index pieceIndex) (ret PieceState) {
522         p := &t.pieces[index]
523         ret.Priority = t.piecePriority(index)
524         ret.Completion = p.completion()
525         ret.QueuedForHash = p.queuedForHash()
526         ret.Hashing = p.hashing
527         ret.Checking = ret.QueuedForHash || ret.Hashing
528         ret.Marking = p.marking
529         if !ret.Complete && t.piecePartiallyDownloaded(index) {
530                 ret.Partial = true
531         }
532         return
533 }
534
535 func (t *Torrent) metadataPieceSize(piece int) int {
536         return metadataPieceSize(len(t.metadataBytes), piece)
537 }
538
539 func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType int, piece int, data []byte) pp.Message {
540         d := map[string]int{
541                 "msg_type": msgType,
542                 "piece":    piece,
543         }
544         if data != nil {
545                 d["total_size"] = len(t.metadataBytes)
546         }
547         p := bencode.MustMarshal(d)
548         return pp.Message{
549                 Type:            pp.Extended,
550                 ExtendedID:      c.PeerExtensionIDs[pp.ExtensionNameMetadata],
551                 ExtendedPayload: append(p, data...),
552         }
553 }
554
555 type pieceAvailabilityRun struct {
556         availability int64
557         count        pieceIndex
558 }
559
560 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
561         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
562                 ret = append(ret, pieceAvailabilityRun{el.(int64), int(count)})
563         })
564         for _, p := range t.pieces {
565                 rle.Append(p.availability, 1)
566         }
567         rle.Flush()
568         return
569 }
570
571 func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
572         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
573                 ret = append(ret, PieceStateRun{
574                         PieceState: el.(PieceState),
575                         Length:     int(count),
576                 })
577         })
578         for index := range t.pieces {
579                 rle.Append(t.pieceState(pieceIndex(index)), 1)
580         }
581         rle.Flush()
582         return
583 }
584
585 // Produces a small string representing a PieceStateRun.
586 func (psr PieceStateRun) String() (ret string) {
587         ret = fmt.Sprintf("%d", psr.Length)
588         ret += func() string {
589                 switch psr.Priority {
590                 case PiecePriorityNext:
591                         return "N"
592                 case PiecePriorityNormal:
593                         return "."
594                 case PiecePriorityReadahead:
595                         return "R"
596                 case PiecePriorityNow:
597                         return "!"
598                 case PiecePriorityHigh:
599                         return "H"
600                 default:
601                         return ""
602                 }
603         }()
604         if psr.Hashing {
605                 ret += "H"
606         }
607         if psr.QueuedForHash {
608                 ret += "Q"
609         }
610         if psr.Marking {
611                 ret += "M"
612         }
613         if psr.Partial {
614                 ret += "P"
615         }
616         if psr.Complete {
617                 ret += "C"
618         }
619         if !psr.Ok {
620                 ret += "?"
621         }
622         return
623 }
624
625 func (t *Torrent) writeStatus(w io.Writer) {
626         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
627         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
628         if !t.haveInfo() {
629                 fmt.Fprintf(w, "Metadata have: ")
630                 for _, h := range t.metadataCompletedChunks {
631                         fmt.Fprintf(w, "%c", func() rune {
632                                 if h {
633                                         return 'H'
634                                 } else {
635                                         return '.'
636                                 }
637                         }())
638                 }
639                 fmt.Fprintln(w)
640         }
641         fmt.Fprintf(w, "Piece length: %s\n",
642                 func() string {
643                         if t.haveInfo() {
644                                 return fmt.Sprintf("%v (%v chunks)",
645                                         t.usualPieceSize(),
646                                         float64(t.usualPieceSize())/float64(t.chunkSize))
647                         } else {
648                                 return "no info"
649                         }
650                 }(),
651         )
652         if t.info != nil {
653                 fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
654                 fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
655                 fmt.Fprintf(w, "Piece availability: %v\n", t.pieceAvailabilityRuns())
656         }
657         fmt.Fprintf(w, "Reader Pieces:")
658         t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
659                 fmt.Fprintf(w, " %d:%d", begin, end)
660                 return true
661         })
662         fmt.Fprintln(w)
663
664         fmt.Fprintf(w, "Enabled trackers:\n")
665         func() {
666                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
667                 fmt.Fprintf(tw, "    URL\tExtra\n")
668                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r torrentTrackerAnnouncer) bool {
669                         lu := l.URL()
670                         ru := r.URL()
671                         var luns, runs url.URL = *lu, *ru
672                         luns.Scheme = ""
673                         runs.Scheme = ""
674                         var ml missinggo.MultiLess
675                         ml.StrictNext(luns.String() == runs.String(), luns.String() < runs.String())
676                         ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
677                         return ml.Less()
678                 }).([]torrentTrackerAnnouncer) {
679                         fmt.Fprintf(tw, "    %q\t%v\n", ta.URL(), ta.statusLine())
680                 }
681                 tw.Flush()
682         }()
683
684         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
685
686         spew.NewDefaultConfig()
687         spew.Fdump(w, t.statsLocked())
688
689         peers := t.peersAsSlice()
690         sort.Slice(peers, func(_i, _j int) bool {
691                 i := peers[_i]
692                 j := peers[_j]
693                 if less, ok := multiless.New().EagerSameLess(
694                         i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
695                 ).LessOk(); ok {
696                         return less
697                 }
698                 return worseConn(i, j)
699         })
700         for i, c := range peers {
701                 fmt.Fprintf(w, "%2d. ", i+1)
702                 c.writeStatus(w, t)
703         }
704 }
705
706 func (t *Torrent) haveInfo() bool {
707         return t.info != nil
708 }
709
710 // Returns a run-time generated MetaInfo that includes the info bytes and
711 // announce-list as currently known to the client.
712 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
713         return metainfo.MetaInfo{
714                 CreationDate: time.Now().Unix(),
715                 Comment:      "dynamic metainfo from client",
716                 CreatedBy:    "go.torrent",
717                 AnnounceList: t.metainfo.UpvertedAnnounceList().Clone(),
718                 InfoBytes: func() []byte {
719                         if t.haveInfo() {
720                                 return t.metadataBytes
721                         } else {
722                                 return nil
723                         }
724                 }(),
725                 UrlList: func() []string {
726                         ret := make([]string, 0, len(t.webSeeds))
727                         for url := range t.webSeeds {
728                                 ret = append(ret, url)
729                         }
730                         return ret
731                 }(),
732         }
733 }
734
735 func (t *Torrent) BytesMissing() int64 {
736         t.cl.rLock()
737         defer t.cl.rUnlock()
738         return t.bytesMissingLocked()
739 }
740
741 func (t *Torrent) bytesMissingLocked() int64 {
742         return t.bytesLeft()
743 }
744
745 func (t *Torrent) bytesLeft() (left int64) {
746         bitmap.Flip(t._completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool {
747                 p := &t.pieces[piece]
748                 left += int64(p.length() - p.numDirtyBytes())
749                 return true
750         })
751         return
752 }
753
754 // Bytes left to give in tracker announces.
755 func (t *Torrent) bytesLeftAnnounce() int64 {
756         if t.haveInfo() {
757                 return t.bytesLeft()
758         } else {
759                 return -1
760         }
761 }
762
763 func (t *Torrent) piecePartiallyDownloaded(piece pieceIndex) bool {
764         if t.pieceComplete(piece) {
765                 return false
766         }
767         if t.pieceAllDirty(piece) {
768                 return false
769         }
770         return t.pieces[piece].hasDirtyChunks()
771 }
772
773 func (t *Torrent) usualPieceSize() int {
774         return int(t.info.PieceLength)
775 }
776
777 func (t *Torrent) numPieces() pieceIndex {
778         return pieceIndex(t.info.NumPieces())
779 }
780
781 func (t *Torrent) numPiecesCompleted() (num int) {
782         return t._completedPieces.Len()
783 }
784
785 func (t *Torrent) close() (err error) {
786         t.closed.Set()
787         t.tickleReaders()
788         if t.storage != nil {
789                 func() {
790                         t.storageLock.Lock()
791                         defer t.storageLock.Unlock()
792                         if f := t.storage.Close; f != nil {
793                                 f()
794                         }
795                 }()
796         }
797         t.iterPeers(func(p *Peer) {
798                 p.close()
799         })
800         t.pex.Reset()
801         t.cl.event.Broadcast()
802         t.pieceStateChanges.Close()
803         t.updateWantPeersEvent()
804         return
805 }
806
807 func (t *Torrent) requestOffset(r Request) int64 {
808         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
809 }
810
811 // Return the request that would include the given offset into the torrent data. Returns !ok if
812 // there is no such request.
813 func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
814         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
815 }
816
817 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
818         defer perf.ScopeTimerErr(&err)()
819         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
820         if err == nil && n != len(data) {
821                 err = io.ErrShortWrite
822         }
823         return err
824 }
825
826 func (t *Torrent) bitfield() (bf []bool) {
827         bf = make([]bool, t.numPieces())
828         t._completedPieces.IterTyped(func(piece int) (again bool) {
829                 bf[piece] = true
830                 return true
831         })
832         return
833 }
834
835 func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
836         return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
837 }
838
839 func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
840         t.pieces[pieceIndex]._dirtyChunks.Clear()
841 }
842
843 func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
844         if t.info.PieceLength == 0 {
845                 // There will be no variance amongst pieces. Only pain.
846                 return 0
847         }
848         if piece == t.numPieces()-1 {
849                 ret := pp.Integer(*t.length % t.info.PieceLength)
850                 if ret != 0 {
851                         return ret
852                 }
853         }
854         return pp.Integer(t.info.PieceLength)
855 }
856
857 func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
858         hash := pieceHash.New()
859         p := t.piece(piece)
860         p.waitNoPendingWrites()
861         storagePiece := t.pieces[piece].Storage()
862         const logPieceContents = false
863         if logPieceContents {
864                 var examineBuf bytes.Buffer
865                 _, err = storagePiece.WriteTo(io.MultiWriter(hash, &examineBuf))
866                 log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
867         } else {
868                 _, err = storagePiece.WriteTo(hash)
869         }
870         missinggo.CopyExact(&ret, hash.Sum(nil))
871         return
872 }
873
874 func (t *Torrent) haveAnyPieces() bool {
875         return t._completedPieces.Len() != 0
876 }
877
878 func (t *Torrent) haveAllPieces() bool {
879         if !t.haveInfo() {
880                 return false
881         }
882         return t._completedPieces.Len() == bitmap.BitIndex(t.numPieces())
883 }
884
885 func (t *Torrent) havePiece(index pieceIndex) bool {
886         return t.haveInfo() && t.pieceComplete(index)
887 }
888
889 func (t *Torrent) maybeDropMutuallyCompletePeer(
890         // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's okay?
891         p *Peer,
892 ) {
893         if !t.cl.config.DropMutuallyCompletePeers {
894                 return
895         }
896         if !t.haveAllPieces() {
897                 return
898         }
899         if all, known := p.peerHasAllPieces(); !(known && all) {
900                 return
901         }
902         if p.useful() {
903                 return
904         }
905         t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p)
906         p.drop()
907 }
908
909 func (t *Torrent) haveChunk(r Request) (ret bool) {
910         // defer func() {
911         //      log.Println("have chunk", r, ret)
912         // }()
913         if !t.haveInfo() {
914                 return false
915         }
916         if t.pieceComplete(pieceIndex(r.Index)) {
917                 return true
918         }
919         p := &t.pieces[r.Index]
920         return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
921 }
922
923 func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
924         return int(cs.Begin / chunkSize)
925 }
926
927 func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
928         // TODO: Are these overly conservative, should we be guarding this here?
929         {
930                 if !t.haveInfo() {
931                         return false
932                 }
933                 if index < 0 || index >= t.numPieces() {
934                         return false
935                 }
936         }
937         p := &t.pieces[index]
938         if p.queuedForHash() {
939                 return false
940         }
941         if p.hashing {
942                 return false
943         }
944         if t.pieceComplete(index) {
945                 return false
946         }
947         if t._pendingPieces.Contains(bitmap.BitIndex(index)) {
948                 return true
949         }
950         // t.logger.Printf("piece %d not pending", index)
951         return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
952                 return index < begin || index >= end
953         })
954 }
955
956 // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
957 // connection is one that usually sends us unwanted pieces, or has been in worser half of the
958 // established connections for more than a minute.
959 func (t *Torrent) worstBadConn() *PeerConn {
960         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
961         heap.Init(&wcs)
962         for wcs.Len() != 0 {
963                 c := heap.Pop(&wcs).(*PeerConn)
964                 if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
965                         return c
966                 }
967                 // If the connection is in the worst half of the established
968                 // connection quota and is older than a minute.
969                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
970                         // Give connections 1 minute to prove themselves.
971                         if time.Since(c.completedHandshake) > time.Minute {
972                                 return c
973                         }
974                 }
975         }
976         return nil
977 }
978
979 type PieceStateChange struct {
980         Index int
981         PieceState
982 }
983
984 func (t *Torrent) publishPieceChange(piece pieceIndex) {
985         t.cl._mu.Defer(func() {
986                 cur := t.pieceState(piece)
987                 p := &t.pieces[piece]
988                 if cur != p.publicPieceState {
989                         p.publicPieceState = cur
990                         t.pieceStateChanges.Publish(PieceStateChange{
991                                 int(piece),
992                                 cur,
993                         })
994                 }
995         })
996 }
997
998 func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
999         if t.pieceComplete(piece) {
1000                 return 0
1001         }
1002         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
1003 }
1004
1005 func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
1006         return t.pieces[piece]._dirtyChunks.Len() == int(t.pieceNumChunks(piece))
1007 }
1008
1009 func (t *Torrent) readersChanged() {
1010         t.updateReaderPieces()
1011         t.updateAllPiecePriorities()
1012 }
1013
1014 func (t *Torrent) updateReaderPieces() {
1015         t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities()
1016 }
1017
1018 func (t *Torrent) readerPosChanged(from, to pieceRange) {
1019         if from == to {
1020                 return
1021         }
1022         t.updateReaderPieces()
1023         // Order the ranges, high and low.
1024         l, h := from, to
1025         if l.begin > h.begin {
1026                 l, h = h, l
1027         }
1028         if l.end < h.begin {
1029                 // Two distinct ranges.
1030                 t.updatePiecePriorities(l.begin, l.end)
1031                 t.updatePiecePriorities(h.begin, h.end)
1032         } else {
1033                 // Ranges overlap.
1034                 end := l.end
1035                 if h.end > end {
1036                         end = h.end
1037                 }
1038                 t.updatePiecePriorities(l.begin, end)
1039         }
1040 }
1041
1042 func (t *Torrent) maybeNewConns() {
1043         // Tickle the accept routine.
1044         t.cl.event.Broadcast()
1045         t.openNewConns()
1046 }
1047
1048 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
1049         // t.logger.Printf("piece %d priority changed", piece)
1050         t.iterPeers(func(c *Peer) {
1051                 if c.updatePiecePriority(piece) {
1052                         // log.Print("conn piece priority changed")
1053                         c.updateRequests()
1054                 }
1055         })
1056         t.maybeNewConns()
1057         t.publishPieceChange(piece)
1058 }
1059
1060 func (t *Torrent) updatePiecePriority(piece pieceIndex) {
1061         p := &t.pieces[piece]
1062         newPrio := p.uncachedPriority()
1063         // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
1064         if newPrio == PiecePriorityNone {
1065                 if !t._pendingPieces.Remove(bitmap.BitIndex(piece)) {
1066                         return
1067                 }
1068         } else {
1069                 if !t._pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) {
1070                         return
1071                 }
1072         }
1073         t.piecePriorityChanged(piece)
1074 }
1075
1076 func (t *Torrent) updateAllPiecePriorities() {
1077         t.updatePiecePriorities(0, t.numPieces())
1078 }
1079
1080 // Update all piece priorities in one hit. This function should have the same
1081 // output as updatePiecePriority, but across all pieces.
1082 func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) {
1083         for i := begin; i < end; i++ {
1084                 t.updatePiecePriority(i)
1085         }
1086 }
1087
1088 // Returns the range of pieces [begin, end) that contains the extent of bytes.
1089 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
1090         if off >= *t.length {
1091                 return
1092         }
1093         if off < 0 {
1094                 size += off
1095                 off = 0
1096         }
1097         if size <= 0 {
1098                 return
1099         }
1100         begin = pieceIndex(off / t.info.PieceLength)
1101         end = pieceIndex((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
1102         if end > pieceIndex(t.info.NumPieces()) {
1103                 end = pieceIndex(t.info.NumPieces())
1104         }
1105         return
1106 }
1107
1108 // Returns true if all iterations complete without breaking. Returns the read
1109 // regions for all readers. The reader regions should not be merged as some
1110 // callers depend on this method to enumerate readers.
1111 func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool)) (all bool) {
1112         for r := range t.readers {
1113                 p := r.pieces
1114                 if p.begin >= p.end {
1115                         continue
1116                 }
1117                 if !f(p.begin, p.end) {
1118                         return false
1119                 }
1120         }
1121         return true
1122 }
1123
1124 func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
1125         prio, ok := t._pendingPieces.GetPriority(bitmap.BitIndex(piece))
1126         if !ok {
1127                 return PiecePriorityNone
1128         }
1129         if prio > 0 {
1130                 panic(prio)
1131         }
1132         ret := piecePriority(-prio)
1133         if ret == PiecePriorityNone {
1134                 panic(piece)
1135         }
1136         return ret
1137 }
1138
1139 func (t *Torrent) pendRequest(req Request) {
1140         ci := chunkIndex(req.ChunkSpec, t.chunkSize)
1141         t.pieces[req.Index].pendChunkIndex(ci)
1142 }
1143
1144 func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
1145         t.tickleReaders()
1146         t.cl.event.Broadcast()
1147         if t.pieceComplete(piece) {
1148                 t.onPieceCompleted(piece)
1149         } else {
1150                 t.onIncompletePiece(piece)
1151         }
1152         t.updatePiecePriority(piece)
1153 }
1154
1155 func (t *Torrent) numReceivedConns() (ret int) {
1156         for c := range t.conns {
1157                 if c.Discovery == PeerSourceIncoming {
1158                         ret++
1159                 }
1160         }
1161         return
1162 }
1163
1164 func (t *Torrent) maxHalfOpen() int {
1165         // Note that if we somehow exceed the maximum established conns, we want
1166         // the negative value to have an effect.
1167         establishedHeadroom := int64(t.maxEstablishedConns - len(t.conns))
1168         extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
1169         // We want to allow some experimentation with new peers, and to try to
1170         // upset an oversupply of received connections.
1171         return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
1172 }
1173
1174 func (t *Torrent) openNewConns() (initiated int) {
1175         defer t.updateWantPeersEvent()
1176         for t.peers.Len() != 0 {
1177                 if !t.wantConns() {
1178                         return
1179                 }
1180                 if len(t.halfOpen) >= t.maxHalfOpen() {
1181                         return
1182                 }
1183                 if len(t.cl.dialers) == 0 {
1184                         return
1185                 }
1186                 if t.cl.numHalfOpen >= t.cl.config.TotalHalfOpenConns {
1187                         return
1188                 }
1189                 p := t.peers.PopMax()
1190                 t.initiateConn(p)
1191                 initiated++
1192         }
1193         return
1194 }
1195
1196 func (t *Torrent) getConnPieceInclination() []int {
1197         _ret := t.connPieceInclinationPool.Get()
1198         if _ret == nil {
1199                 pieceInclinationsNew.Add(1)
1200                 return rand.Perm(int(t.numPieces()))
1201         }
1202         pieceInclinationsReused.Add(1)
1203         return *_ret.(*[]int)
1204 }
1205
1206 func (t *Torrent) putPieceInclination(pi []int) {
1207         t.connPieceInclinationPool.Put(&pi)
1208         pieceInclinationsPut.Add(1)
1209 }
1210
1211 func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
1212         p := t.piece(piece)
1213         uncached := t.pieceCompleteUncached(piece)
1214         cached := p.completion()
1215         changed := cached != uncached
1216         complete := uncached.Complete
1217         p.storageCompletionOk = uncached.Ok
1218         t._completedPieces.Set(bitmap.BitIndex(piece), complete)
1219         if complete && len(p.dirtiers) != 0 {
1220                 t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
1221         }
1222         if changed {
1223                 log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger)
1224                 t.pieceCompletionChanged(piece)
1225         }
1226         return changed
1227 }
1228
1229 // Non-blocking read. Client lock is not required.
1230 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1231         for len(b) != 0 {
1232                 p := &t.pieces[off/t.info.PieceLength]
1233                 p.waitNoPendingWrites()
1234                 var n1 int
1235                 n1, err = p.Storage().ReadAt(b, off-p.Info().Offset())
1236                 if n1 == 0 {
1237                         break
1238                 }
1239                 off += int64(n1)
1240                 n += n1
1241                 b = b[n1:]
1242         }
1243         return
1244 }
1245
1246 // Returns an error if the metadata was completed, but couldn't be set for some reason. Blame it on
1247 // the last peer to contribute. TODO: Actually we shouldn't blame peers for failure to open storage
1248 // etc. Also we should probably cached metadata pieces per-Peer, to isolate failure appropriately.
1249 func (t *Torrent) maybeCompleteMetadata() error {
1250         if t.haveInfo() {
1251                 // Nothing to do.
1252                 return nil
1253         }
1254         if !t.haveAllMetadataPieces() {
1255                 // Don't have enough metadata pieces.
1256                 return nil
1257         }
1258         err := t.setInfoBytes(t.metadataBytes)
1259         if err != nil {
1260                 t.invalidateMetadata()
1261                 return fmt.Errorf("error setting info bytes: %s", err)
1262         }
1263         if t.cl.config.Debug {
1264                 t.logger.Printf("%s: got metadata from peers", t)
1265         }
1266         return nil
1267 }
1268
1269 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1270         t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
1271                 if end > begin {
1272                         now.Add(bitmap.BitIndex(begin))
1273                         readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end))
1274                 }
1275                 return true
1276         })
1277         return
1278 }
1279
1280 func (t *Torrent) needData() bool {
1281         if t.closed.IsSet() {
1282                 return false
1283         }
1284         if !t.haveInfo() {
1285                 return true
1286         }
1287         return t._pendingPieces.Len() != 0
1288 }
1289
1290 func appendMissingStrings(old, new []string) (ret []string) {
1291         ret = old
1292 new:
1293         for _, n := range new {
1294                 for _, o := range old {
1295                         if o == n {
1296                                 continue new
1297                         }
1298                 }
1299                 ret = append(ret, n)
1300         }
1301         return
1302 }
1303
1304 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1305         ret = existing
1306         for minNumTiers > len(ret) {
1307                 ret = append(ret, nil)
1308         }
1309         return
1310 }
1311
1312 func (t *Torrent) addTrackers(announceList [][]string) {
1313         fullAnnounceList := &t.metainfo.AnnounceList
1314         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1315         for tierIndex, trackerURLs := range announceList {
1316                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1317         }
1318         t.startMissingTrackerScrapers()
1319         t.updateWantPeersEvent()
1320 }
1321
1322 // Don't call this before the info is available.
1323 func (t *Torrent) bytesCompleted() int64 {
1324         if !t.haveInfo() {
1325                 return 0
1326         }
1327         return t.info.TotalLength() - t.bytesLeft()
1328 }
1329
1330 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1331         t.cl.lock()
1332         defer t.cl.unlock()
1333         return t.setInfoBytes(b)
1334 }
1335
1336 // Returns true if connection is removed from torrent.Conns.
1337 func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
1338         if !c.closed.IsSet() {
1339                 panic("connection is not closed")
1340                 // There are behaviours prevented by the closed state that will fail
1341                 // if the connection has been deleted.
1342         }
1343         _, ret = t.conns[c]
1344         delete(t.conns, c)
1345         // Avoid adding a drop event more than once. Probably we should track whether we've generated
1346         // the drop event against the PexConnState instead.
1347         if ret {
1348                 if !t.cl.config.DisablePEX {
1349                         t.pex.Drop(c)
1350                 }
1351         }
1352         torrent.Add("deleted connections", 1)
1353         c.deleteAllRequests()
1354         if t.numActivePeers() == 0 {
1355                 t.assertNoPendingRequests()
1356         }
1357         return
1358 }
1359
1360 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
1361         p.newPeerPieces().IterTyped(func(i int) bool {
1362                 p.t.decPieceAvailability(i)
1363                 return true
1364         })
1365 }
1366
1367 func (t *Torrent) numActivePeers() (num int) {
1368         t.iterPeers(func(*Peer) {
1369                 num++
1370         })
1371         return
1372 }
1373
1374 func (t *Torrent) assertNoPendingRequests() {
1375         if len(t.pendingRequests) != 0 {
1376                 panic(t.pendingRequests)
1377         }
1378         //if len(t.lastRequested) != 0 {
1379         //      panic(t.lastRequested)
1380         //}
1381 }
1382
1383 func (t *Torrent) dropConnection(c *PeerConn) {
1384         t.cl.event.Broadcast()
1385         c.close()
1386         if t.deletePeerConn(c) {
1387                 t.openNewConns()
1388         }
1389 }
1390
1391 func (t *Torrent) wantPeers() bool {
1392         if t.closed.IsSet() {
1393                 return false
1394         }
1395         if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
1396                 return false
1397         }
1398         return t.needData() || t.seeding()
1399 }
1400
1401 func (t *Torrent) updateWantPeersEvent() {
1402         if t.wantPeers() {
1403                 t.wantPeersEvent.Set()
1404         } else {
1405                 t.wantPeersEvent.Clear()
1406         }
1407 }
1408
1409 // Returns whether the client should make effort to seed the torrent.
1410 func (t *Torrent) seeding() bool {
1411         cl := t.cl
1412         if t.closed.IsSet() {
1413                 return false
1414         }
1415         if t.dataUploadDisallowed {
1416                 return false
1417         }
1418         if cl.config.NoUpload {
1419                 return false
1420         }
1421         if !cl.config.Seed {
1422                 return false
1423         }
1424         if cl.config.DisableAggressiveUpload && t.needData() {
1425                 return false
1426         }
1427         return true
1428 }
1429
1430 func (t *Torrent) onWebRtcConn(
1431         c datachannel.ReadWriteCloser,
1432         dcc webtorrent.DataChannelContext,
1433 ) {
1434         defer c.Close()
1435         pc, err := t.cl.initiateProtocolHandshakes(
1436                 context.Background(),
1437                 webrtcNetConn{c, dcc},
1438                 t,
1439                 dcc.LocalOffered,
1440                 false,
1441                 webrtcNetAddr{dcc.Remote},
1442                 webrtcNetwork,
1443                 fmt.Sprintf("webrtc offer_id %x", dcc.OfferId),
1444         )
1445         if err != nil {
1446                 t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
1447                 return
1448         }
1449         if dcc.LocalOffered {
1450                 pc.Discovery = PeerSourceTracker
1451         } else {
1452                 pc.Discovery = PeerSourceIncoming
1453         }
1454         t.cl.lock()
1455         defer t.cl.unlock()
1456         err = t.cl.runHandshookConn(pc, t)
1457         if err != nil {
1458                 t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
1459         }
1460 }
1461
1462 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
1463         err := t.cl.runHandshookConn(pc, t)
1464         if err != nil || logAll {
1465                 t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
1466         }
1467 }
1468
1469 func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
1470         t.logRunHandshookConn(pc, false, log.Debug)
1471 }
1472
1473 func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
1474         wtc, release := t.cl.websocketTrackers.Get(u.String())
1475         go func() {
1476                 <-t.closed.LockedChan(t.cl.locker())
1477                 release()
1478         }()
1479         wst := websocketTrackerStatus{u, wtc}
1480         go func() {
1481                 err := wtc.Announce(tracker.Started, t.infoHash)
1482                 if err != nil {
1483                         t.logger.WithDefaultLevel(log.Warning).Printf(
1484                                 "error in initial announce to %q: %v",
1485                                 u.String(), err,
1486                         )
1487                 }
1488         }()
1489         return wst
1490
1491 }
1492
1493 func (t *Torrent) startScrapingTracker(_url string) {
1494         if _url == "" {
1495                 return
1496         }
1497         u, err := url.Parse(_url)
1498         if err != nil {
1499                 // URLs with a leading '*' appear to be a uTorrent convention to
1500                 // disable trackers.
1501                 if _url[0] != '*' {
1502                         log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger)
1503                 }
1504                 return
1505         }
1506         if u.Scheme == "udp" {
1507                 u.Scheme = "udp4"
1508                 t.startScrapingTracker(u.String())
1509                 u.Scheme = "udp6"
1510                 t.startScrapingTracker(u.String())
1511                 return
1512         }
1513         if _, ok := t.trackerAnnouncers[_url]; ok {
1514                 return
1515         }
1516         sl := func() torrentTrackerAnnouncer {
1517                 switch u.Scheme {
1518                 case "ws", "wss":
1519                         if t.cl.config.DisableWebtorrent {
1520                                 return nil
1521                         }
1522                         return t.startWebsocketAnnouncer(*u)
1523                 case "udp4":
1524                         if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
1525                                 return nil
1526                         }
1527                 case "udp6":
1528                         if t.cl.config.DisableIPv6 {
1529                                 return nil
1530                         }
1531                 }
1532                 newAnnouncer := &trackerScraper{
1533                         u: *u,
1534                         t: t,
1535                 }
1536                 go newAnnouncer.Run()
1537                 return newAnnouncer
1538         }()
1539         if sl == nil {
1540                 return
1541         }
1542         if t.trackerAnnouncers == nil {
1543                 t.trackerAnnouncers = make(map[string]torrentTrackerAnnouncer)
1544         }
1545         t.trackerAnnouncers[_url] = sl
1546 }
1547
1548 // Adds and starts tracker scrapers for tracker URLs that aren't already
1549 // running.
1550 func (t *Torrent) startMissingTrackerScrapers() {
1551         if t.cl.config.DisableTrackers {
1552                 return
1553         }
1554         t.startScrapingTracker(t.metainfo.Announce)
1555         for _, tier := range t.metainfo.AnnounceList {
1556                 for _, url := range tier {
1557                         t.startScrapingTracker(url)
1558                 }
1559         }
1560 }
1561
1562 // Returns an AnnounceRequest with fields filled out to defaults and current
1563 // values.
1564 func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceRequest {
1565         // Note that IPAddress is not set. It's set for UDP inside the tracker code, since it's
1566         // dependent on the network in use.
1567         return tracker.AnnounceRequest{
1568                 Event: event,
1569                 NumWant: func() int32 {
1570                         if t.wantPeers() && len(t.cl.dialers) > 0 {
1571                                 return -1
1572                         } else {
1573                                 return 0
1574                         }
1575                 }(),
1576                 Port:     uint16(t.cl.incomingPeerPort()),
1577                 PeerId:   t.cl.peerID,
1578                 InfoHash: t.infoHash,
1579                 Key:      t.cl.announceKey(),
1580
1581                 // The following are vaguely described in BEP 3.
1582
1583                 Left:     t.bytesLeftAnnounce(),
1584                 Uploaded: t.stats.BytesWrittenData.Int64(),
1585                 // There's no mention of wasted or unwanted download in the BEP.
1586                 Downloaded: t.stats.BytesReadUsefulData.Int64(),
1587         }
1588 }
1589
1590 // Adds peers revealed in an announce until the announce ends, or we have
1591 // enough peers.
1592 func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
1593         cl := t.cl
1594         for v := range pvs {
1595                 cl.lock()
1596                 added := 0
1597                 for _, cp := range v.Peers {
1598                         if cp.Port == 0 {
1599                                 // Can't do anything with this.
1600                                 continue
1601                         }
1602                         if t.addPeer(PeerInfo{
1603                                 Addr:   ipPortAddr{cp.IP, cp.Port},
1604                                 Source: PeerSourceDhtGetPeers,
1605                         }) {
1606                                 added++
1607                         }
1608                 }
1609                 cl.unlock()
1610                 if added != 0 {
1611                         //log.Printf("added %v peers from dht for %v", added, t.InfoHash().HexString())
1612                 }
1613         }
1614 }
1615
1616 func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error {
1617         ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
1618         if err != nil {
1619                 return err
1620         }
1621         go t.consumeDhtAnnouncePeers(ps.Peers())
1622         select {
1623         case <-t.closed.LockedChan(t.cl.locker()):
1624         case <-time.After(5 * time.Minute):
1625         }
1626         ps.Close()
1627         return nil
1628 }
1629
1630 func (t *Torrent) dhtAnnouncer(s DhtServer) {
1631         cl := t.cl
1632         cl.lock()
1633         defer cl.unlock()
1634         for {
1635                 for {
1636                         if t.closed.IsSet() {
1637                                 return
1638                         }
1639                         if !t.wantPeers() {
1640                                 goto wait
1641                         }
1642                         // TODO: Determine if there's a listener on the port we're announcing.
1643                         if len(cl.dialers) == 0 && len(cl.listeners) == 0 {
1644                                 goto wait
1645                         }
1646                         break
1647                 wait:
1648                         cl.event.Wait()
1649                 }
1650                 func() {
1651                         t.numDHTAnnounces++
1652                         cl.unlock()
1653                         defer cl.lock()
1654                         err := t.announceToDht(true, s)
1655                         if err != nil {
1656                                 t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
1657                         }
1658                 }()
1659         }
1660 }
1661
1662 func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
1663         for _, p := range peers {
1664                 if t.addPeer(p) {
1665                         added++
1666                 }
1667         }
1668         return
1669 }
1670
1671 // The returned TorrentStats may require alignment in memory. See
1672 // https://github.com/anacrolix/torrent/issues/383.
1673 func (t *Torrent) Stats() TorrentStats {
1674         t.cl.rLock()
1675         defer t.cl.rUnlock()
1676         return t.statsLocked()
1677 }
1678
1679 func (t *Torrent) statsLocked() (ret TorrentStats) {
1680         ret.ActivePeers = len(t.conns)
1681         ret.HalfOpenPeers = len(t.halfOpen)
1682         ret.PendingPeers = t.peers.Len()
1683         ret.TotalPeers = t.numTotalPeers()
1684         ret.ConnectedSeeders = 0
1685         for c := range t.conns {
1686                 if all, ok := c.peerHasAllPieces(); all && ok {
1687                         ret.ConnectedSeeders++
1688                 }
1689         }
1690         ret.ConnStats = t.stats.Copy()
1691         return
1692 }
1693
1694 // The total number of peers in the torrent.
1695 func (t *Torrent) numTotalPeers() int {
1696         peers := make(map[string]struct{})
1697         for conn := range t.conns {
1698                 ra := conn.conn.RemoteAddr()
1699                 if ra == nil {
1700                         // It's been closed and doesn't support RemoteAddr.
1701                         continue
1702                 }
1703                 peers[ra.String()] = struct{}{}
1704         }
1705         for addr := range t.halfOpen {
1706                 peers[addr] = struct{}{}
1707         }
1708         t.peers.Each(func(peer PeerInfo) {
1709                 peers[peer.Addr.String()] = struct{}{}
1710         })
1711         return len(peers)
1712 }
1713
1714 // Reconcile bytes transferred before connection was associated with a
1715 // torrent.
1716 func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
1717         if c._stats != (ConnStats{
1718                 // Handshakes should only increment these fields:
1719                 BytesWritten: c._stats.BytesWritten,
1720                 BytesRead:    c._stats.BytesRead,
1721         }) {
1722                 panic("bad stats")
1723         }
1724         c.postHandshakeStats(func(cs *ConnStats) {
1725                 cs.BytesRead.Add(c._stats.BytesRead.Int64())
1726                 cs.BytesWritten.Add(c._stats.BytesWritten.Int64())
1727         })
1728         c.reconciledHandshakeStats = true
1729 }
1730
1731 // Returns true if the connection is added.
1732 func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
1733         defer func() {
1734                 if err == nil {
1735                         torrent.Add("added connections", 1)
1736                 }
1737         }()
1738         if t.closed.IsSet() {
1739                 return errors.New("torrent closed")
1740         }
1741         for c0 := range t.conns {
1742                 if c.PeerID != c0.PeerID {
1743                         continue
1744                 }
1745                 if !t.cl.config.DropDuplicatePeerIds {
1746                         continue
1747                 }
1748                 if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
1749                         c0.close()
1750                         t.deletePeerConn(c0)
1751                 } else {
1752                         return errors.New("existing connection preferred")
1753                 }
1754         }
1755         if len(t.conns) >= t.maxEstablishedConns {
1756                 c := t.worstBadConn()
1757                 if c == nil {
1758                         return errors.New("don't want conns")
1759                 }
1760                 c.close()
1761                 t.deletePeerConn(c)
1762         }
1763         if len(t.conns) >= t.maxEstablishedConns {
1764                 panic(len(t.conns))
1765         }
1766         t.conns[c] = struct{}{}
1767         if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
1768                 t.pex.Add(c) // as no further extended handshake expected
1769         }
1770         return nil
1771 }
1772
1773 func (t *Torrent) wantConns() bool {
1774         if !t.networkingEnabled {
1775                 return false
1776         }
1777         if t.closed.IsSet() {
1778                 return false
1779         }
1780         if !t.seeding() && !t.needData() {
1781                 return false
1782         }
1783         if len(t.conns) < t.maxEstablishedConns {
1784                 return true
1785         }
1786         return t.worstBadConn() != nil
1787 }
1788
1789 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1790         t.cl.lock()
1791         defer t.cl.unlock()
1792         oldMax = t.maxEstablishedConns
1793         t.maxEstablishedConns = max
1794         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
1795                 return worseConn(&l.Peer, &r.Peer)
1796         })
1797         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1798                 t.dropConnection(wcs.Pop().(*PeerConn))
1799         }
1800         t.openNewConns()
1801         return oldMax
1802 }
1803
1804 func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
1805         t.logger.Log(log.Fstr("hashed piece %d (passed=%t)", piece, passed).SetLevel(log.Debug))
1806         p := t.piece(piece)
1807         p.numVerifies++
1808         t.cl.event.Broadcast()
1809         if t.closed.IsSet() {
1810                 return
1811         }
1812
1813         // Don't score the first time a piece is hashed, it could be an initial check.
1814         if p.storageCompletionOk {
1815                 if passed {
1816                         pieceHashedCorrect.Add(1)
1817                 } else {
1818                         log.Fmsg("piece %d failed hash: %d connections contributed", piece, len(p.dirtiers)).AddValues(t, p).Log(t.logger)
1819                         pieceHashedNotCorrect.Add(1)
1820                 }
1821         }
1822
1823         p.marking = true
1824         t.publishPieceChange(piece)
1825         defer func() {
1826                 p.marking = false
1827                 t.publishPieceChange(piece)
1828         }()
1829
1830         if passed {
1831                 if len(p.dirtiers) != 0 {
1832                         // Don't increment stats above connection-level for every involved connection.
1833                         t.allStats((*ConnStats).incrementPiecesDirtiedGood)
1834                 }
1835                 for c := range p.dirtiers {
1836                         c._stats.incrementPiecesDirtiedGood()
1837                 }
1838                 t.clearPieceTouchers(piece)
1839                 t.cl.unlock()
1840                 err := p.Storage().MarkComplete()
1841                 if err != nil {
1842                         t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1843                 }
1844                 t.cl.lock()
1845
1846                 if t.closed.IsSet() {
1847                         return
1848                 }
1849                 t.pendAllChunkSpecs(piece)
1850         } else {
1851                 if len(p.dirtiers) != 0 && p.allChunksDirty() && hashIoErr == nil {
1852                         // Peers contributed to all the data for this piece hash failure, and the failure was
1853                         // not due to errors in the storage (such as data being dropped in a cache).
1854
1855                         // Increment Torrent and above stats, and then specific connections.
1856                         t.allStats((*ConnStats).incrementPiecesDirtiedBad)
1857                         for c := range p.dirtiers {
1858                                 // Y u do dis peer?!
1859                                 c.stats().incrementPiecesDirtiedBad()
1860                         }
1861
1862                         bannableTouchers := make([]*Peer, 0, len(p.dirtiers))
1863                         for c := range p.dirtiers {
1864                                 if !c.trusted {
1865                                         bannableTouchers = append(bannableTouchers, c)
1866                                 }
1867                         }
1868                         t.clearPieceTouchers(piece)
1869                         slices.Sort(bannableTouchers, connLessTrusted)
1870
1871                         if t.cl.config.Debug {
1872                                 t.logger.Printf(
1873                                         "bannable conns by trust for piece %d: %v",
1874                                         piece,
1875                                         func() (ret []connectionTrust) {
1876                                                 for _, c := range bannableTouchers {
1877                                                         ret = append(ret, c.trust())
1878                                                 }
1879                                                 return
1880                                         }(),
1881                                 )
1882                         }
1883
1884                         if len(bannableTouchers) >= 1 {
1885                                 c := bannableTouchers[0]
1886                                 t.cl.banPeerIP(c.remoteIp())
1887                                 c.drop()
1888                         }
1889                 }
1890                 t.onIncompletePiece(piece)
1891                 p.Storage().MarkNotComplete()
1892         }
1893         t.updatePieceCompletion(piece)
1894 }
1895
1896 func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
1897         // TODO: Make faster
1898         for cn := range t.conns {
1899                 cn.tickleWriter()
1900         }
1901 }
1902
1903 func (t *Torrent) onPieceCompleted(piece pieceIndex) {
1904         t.pendAllChunkSpecs(piece)
1905         t.cancelRequestsForPiece(piece)
1906         for conn := range t.conns {
1907                 conn.have(piece)
1908                 t.maybeDropMutuallyCompletePeer(&conn.Peer)
1909         }
1910 }
1911
1912 // Called when a piece is found to be not complete.
1913 func (t *Torrent) onIncompletePiece(piece pieceIndex) {
1914         if t.pieceAllDirty(piece) {
1915                 t.pendAllChunkSpecs(piece)
1916         }
1917         if !t.wantPieceIndex(piece) {
1918                 // t.logger.Printf("piece %d incomplete and unwanted", piece)
1919                 return
1920         }
1921         // We could drop any connections that we told we have a piece that we
1922         // don't here. But there's a test failure, and it seems clients don't care
1923         // if you request pieces that you already claim to have. Pruning bad
1924         // connections might just remove any connections that aren't treating us
1925         // favourably anyway.
1926
1927         // for c := range t.conns {
1928         //      if c.sentHave(piece) {
1929         //              c.drop()
1930         //      }
1931         // }
1932         t.iterPeers(func(conn *Peer) {
1933                 if conn.peerHasPiece(piece) {
1934                         conn.updateRequests()
1935                 }
1936         })
1937 }
1938
1939 func (t *Torrent) tryCreateMorePieceHashers() {
1940         for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() {
1941         }
1942 }
1943
1944 func (t *Torrent) tryCreatePieceHasher() bool {
1945         if t.storage == nil {
1946                 return false
1947         }
1948         pi, ok := t.getPieceToHash()
1949         if !ok {
1950                 return false
1951         }
1952         p := t.piece(pi)
1953         t.piecesQueuedForHash.Remove(pi)
1954         p.hashing = true
1955         t.publishPieceChange(pi)
1956         t.updatePiecePriority(pi)
1957         t.storageLock.RLock()
1958         t.activePieceHashes++
1959         go t.pieceHasher(pi)
1960         return true
1961 }
1962
1963 func (t *Torrent) getPieceToHash() (ret pieceIndex, ok bool) {
1964         t.piecesQueuedForHash.IterTyped(func(i pieceIndex) bool {
1965                 if t.piece(i).hashing {
1966                         return true
1967                 }
1968                 ret = i
1969                 ok = true
1970                 return false
1971         })
1972         return
1973 }
1974
1975 func (t *Torrent) pieceHasher(index pieceIndex) {
1976         p := t.piece(index)
1977         sum, copyErr := t.hashPiece(index)
1978         correct := sum == *p.hash
1979         switch copyErr {
1980         case nil, io.EOF:
1981         default:
1982                 log.Fmsg("piece %v (%s) hash failure copy error: %v", p, p.hash.HexString(), copyErr).Log(t.logger)
1983         }
1984         t.storageLock.RUnlock()
1985         t.cl.lock()
1986         defer t.cl.unlock()
1987         p.hashing = false
1988         t.updatePiecePriority(index)
1989         t.pieceHashed(index, correct, copyErr)
1990         t.publishPieceChange(index)
1991         t.activePieceHashes--
1992         t.tryCreateMorePieceHashers()
1993 }
1994
1995 // Return the connections that touched a piece, and clear the entries while doing it.
1996 func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
1997         p := t.piece(pi)
1998         for c := range p.dirtiers {
1999                 delete(c.peerTouchedPieces, pi)
2000                 delete(p.dirtiers, c)
2001         }
2002 }
2003
2004 func (t *Torrent) peersAsSlice() (ret []*Peer) {
2005         t.iterPeers(func(p *Peer) {
2006                 ret = append(ret, p)
2007         })
2008         return
2009 }
2010
2011 func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) {
2012         piece := t.piece(pieceIndex)
2013         if piece.queuedForHash() {
2014                 return
2015         }
2016         t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex))
2017         t.publishPieceChange(pieceIndex)
2018         t.updatePiecePriority(pieceIndex)
2019         t.tryCreateMorePieceHashers()
2020 }
2021
2022 // Forces all the pieces to be re-hashed. See also Piece.VerifyData. This should not be called
2023 // before the Info is available.
2024 func (t *Torrent) VerifyData() {
2025         for i := pieceIndex(0); i < t.NumPieces(); i++ {
2026                 t.Piece(i).VerifyData()
2027         }
2028 }
2029
2030 // Start the process of connecting to the given peer for the given torrent if appropriate.
2031 func (t *Torrent) initiateConn(peer PeerInfo) {
2032         if peer.Id == t.cl.peerID {
2033                 return
2034         }
2035         if t.cl.badPeerAddr(peer.Addr) && !peer.Trusted {
2036                 return
2037         }
2038         addr := peer.Addr
2039         if t.addrActive(addr.String()) {
2040                 return
2041         }
2042         t.cl.numHalfOpen++
2043         t.halfOpen[addr.String()] = peer
2044         go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
2045 }
2046
2047 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
2048 // quickly make one Client visible to the Torrent of another Client.
2049 func (t *Torrent) AddClientPeer(cl *Client) int {
2050         return t.AddPeers(func() (ps []PeerInfo) {
2051                 for _, la := range cl.ListenAddrs() {
2052                         ps = append(ps, PeerInfo{
2053                                 Addr:    la,
2054                                 Trusted: true,
2055                         })
2056                 }
2057                 return
2058         }())
2059 }
2060
2061 // All stats that include this Torrent. Useful when we want to increment ConnStats but not for every
2062 // connection.
2063 func (t *Torrent) allStats(f func(*ConnStats)) {
2064         f(&t.stats)
2065         f(&t.cl.stats)
2066 }
2067
2068 func (t *Torrent) hashingPiece(i pieceIndex) bool {
2069         return t.pieces[i].hashing
2070 }
2071
2072 func (t *Torrent) pieceQueuedForHash(i pieceIndex) bool {
2073         return t.piecesQueuedForHash.Get(bitmap.BitIndex(i))
2074 }
2075
2076 func (t *Torrent) dialTimeout() time.Duration {
2077         return reducedDialTimeout(t.cl.config.MinDialTimeout, t.cl.config.NominalDialTimeout, t.cl.config.HalfOpenConnsPerTorrent, t.peers.Len())
2078 }
2079
2080 func (t *Torrent) piece(i int) *Piece {
2081         return &t.pieces[i]
2082 }
2083
2084 func (t *Torrent) onWriteChunkErr(err error) {
2085         if t.userOnWriteChunkErr != nil {
2086                 go t.userOnWriteChunkErr(err)
2087                 return
2088         }
2089         t.logger.WithDefaultLevel(log.Critical).Printf("default chunk write error handler: disabling data download")
2090         t.disallowDataDownloadLocked()
2091 }
2092
2093 func (t *Torrent) DisallowDataDownload() {
2094         t.cl.lock()
2095         defer t.cl.unlock()
2096         t.disallowDataDownloadLocked()
2097 }
2098
2099 func (t *Torrent) disallowDataDownloadLocked() {
2100         t.dataDownloadDisallowed = true
2101         t.iterPeers(func(c *Peer) {
2102                 c.updateRequests()
2103         })
2104         t.tickleReaders()
2105 }
2106
2107 func (t *Torrent) AllowDataDownload() {
2108         t.cl.lock()
2109         defer t.cl.unlock()
2110         t.dataDownloadDisallowed = false
2111         t.tickleReaders()
2112         t.iterPeers(func(c *Peer) {
2113                 c.updateRequests()
2114         })
2115 }
2116
2117 // Enables uploading data, if it was disabled.
2118 func (t *Torrent) AllowDataUpload() {
2119         t.cl.lock()
2120         defer t.cl.unlock()
2121         t.dataUploadDisallowed = false
2122         for c := range t.conns {
2123                 c.updateRequests()
2124         }
2125 }
2126
2127 // Disables uploading data, if it was enabled.
2128 func (t *Torrent) DisallowDataUpload() {
2129         t.cl.lock()
2130         defer t.cl.unlock()
2131         t.dataUploadDisallowed = true
2132         for c := range t.conns {
2133                 c.updateRequests()
2134         }
2135 }
2136
2137 // Sets a handler that is called if there's an error writing a chunk to local storage. By default,
2138 // or if nil, a critical message is logged, and data download is disabled.
2139 func (t *Torrent) SetOnWriteChunkError(f func(error)) {
2140         t.cl.lock()
2141         defer t.cl.unlock()
2142         t.userOnWriteChunkErr = f
2143 }
2144
2145 func (t *Torrent) iterPeers(f func(p *Peer)) {
2146         for pc := range t.conns {
2147                 f(&pc.Peer)
2148         }
2149         for _, ws := range t.webSeeds {
2150                 f(ws)
2151         }
2152 }
2153
2154 func (t *Torrent) callbacks() *Callbacks {
2155         return &t.cl.config.Callbacks
2156 }
2157
2158 var WebseedHttpClient = &http.Client{
2159         Transport: &http.Transport{
2160                 MaxConnsPerHost: 10,
2161         },
2162 }
2163
2164 func (t *Torrent) addWebSeed(url string) {
2165         if t.cl.config.DisableWebseeds {
2166                 return
2167         }
2168         if _, ok := t.webSeeds[url]; ok {
2169                 return
2170         }
2171         const maxRequests = 10
2172         ws := webseedPeer{
2173                 peer: Peer{
2174                         t:                        t,
2175                         outgoing:                 true,
2176                         Network:                  "http",
2177                         reconciledHandshakeStats: true,
2178                         // TODO: Raise this limit, and instead limit concurrent fetches.
2179                         PeerMaxRequests: 32,
2180                         RemoteAddr:      remoteAddrFromUrl(url),
2181                         callbacks:       t.callbacks(),
2182                 },
2183                 client: webseed.Client{
2184                         // Consider a MaxConnsPerHost in the transport for this, possibly in a global Client.
2185                         HttpClient: WebseedHttpClient,
2186                         Url:        url,
2187                 },
2188                 activeRequests: make(map[Request]webseed.Request, maxRequests),
2189         }
2190         ws.requesterCond.L = t.cl.locker()
2191         for range iter.N(maxRequests) {
2192                 go ws.requester()
2193         }
2194         for _, f := range t.callbacks().NewPeer {
2195                 f(&ws.peer)
2196         }
2197         ws.peer.logger = t.logger.WithContextValue(&ws)
2198         ws.peer.peerImpl = &ws
2199         if t.haveInfo() {
2200                 ws.onGotInfo(t.info)
2201         }
2202         t.webSeeds[url] = &ws.peer
2203         ws.peer.onPeerHasAllPieces()
2204 }
2205
2206 func (t *Torrent) peerIsActive(p *Peer) (active bool) {
2207         t.iterPeers(func(p1 *Peer) {
2208                 if p1 == p {
2209                         active = true
2210                 }
2211         })
2212         return
2213 }