]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
New slices package
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "errors"
6         "fmt"
7         "io"
8         "log"
9         "math"
10         "math/rand"
11         "net"
12         "os"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/bitmap"
18         "github.com/anacrolix/missinggo/itertools"
19         "github.com/anacrolix/missinggo/perf"
20         "github.com/anacrolix/missinggo/pubsub"
21         "github.com/anacrolix/missinggo/slices"
22         "github.com/bradfitz/iter"
23
24         "github.com/anacrolix/torrent/bencode"
25         "github.com/anacrolix/torrent/metainfo"
26         pp "github.com/anacrolix/torrent/peer_protocol"
27         "github.com/anacrolix/torrent/storage"
28         "github.com/anacrolix/torrent/tracker"
29 )
30
31 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
32         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
33 }
34
35 type peersKey struct {
36         IPBytes string
37         Port    int
38 }
39
40 // Maintains state of torrent within a Client.
41 type Torrent struct {
42         cl *Client
43
44         closed   missinggo.Event
45         infoHash metainfo.Hash
46         pieces   []piece
47         // Values are the piece indices that changed.
48         pieceStateChanges *pubsub.PubSub
49         chunkSize         pp.Integer
50         // Total length of the torrent in bytes. Stored because it's not O(1) to
51         // get this from the info dict.
52         length int64
53
54         // The storage to open when the info dict becomes available.
55         storageOpener storage.Client
56         // Storage for torrent data.
57         storage storage.Torrent
58
59         metainfo metainfo.MetaInfo
60
61         // The info dict. nil if we don't have it (yet).
62         info *metainfo.InfoEx
63         // Active peer connections, running message stream loops.
64         conns               []*connection
65         maxEstablishedConns int
66         // Set of addrs to which we're attempting to connect. Connections are
67         // half-open until all handshakes are completed.
68         halfOpen map[string]struct{}
69
70         // Reserve of peers to connect to. A peer can be both here and in the
71         // active connections if were told about the peer after connecting with
72         // them. That encourages us to reconnect to peers that are well known.
73         peers          map[peersKey]Peer
74         wantPeersEvent missinggo.Event
75         // An announcer for each tracker URL.
76         trackerAnnouncers map[string]*trackerScraper
77         // How many times we've initiated a DHT announce.
78         numDHTAnnounces int
79
80         // Name used if the info name isn't available.
81         displayName string
82         // The bencoded bytes of the info dict.
83         metadataBytes []byte
84         // Each element corresponds to the 16KiB metadata pieces. If true, we have
85         // received that piece.
86         metadataCompletedChunks []bool
87
88         // Set when .Info is obtained.
89         gotMetainfo missinggo.Event
90
91         readers map[*Reader]struct{}
92
93         pendingPieces   bitmap.Bitmap
94         completedPieces bitmap.Bitmap
95
96         connPieceInclinationPool sync.Pool
97         stats                    TorrentStats
98 }
99
100 func (t *Torrent) setDisplayName(dn string) {
101         if t.haveInfo() {
102                 return
103         }
104         t.displayName = dn
105 }
106
107 func (t *Torrent) pieceComplete(piece int) bool {
108         return t.completedPieces.Get(piece)
109 }
110
111 func (t *Torrent) pieceCompleteUncached(piece int) bool {
112         return t.pieces[piece].Storage().GetIsComplete()
113 }
114
115 func (t *Torrent) numConnsUnchoked() (num int) {
116         for _, c := range t.conns {
117                 if !c.PeerChoked {
118                         num++
119                 }
120         }
121         return
122 }
123
124 // There's a connection to that address already.
125 func (t *Torrent) addrActive(addr string) bool {
126         if _, ok := t.halfOpen[addr]; ok {
127                 return true
128         }
129         for _, c := range t.conns {
130                 if c.remoteAddr().String() == addr {
131                         return true
132                 }
133         }
134         return false
135 }
136
137 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
138         ret = make([]*connection, 0, len(t.conns))
139         for _, c := range t.conns {
140                 if !c.closed.IsSet() {
141                         ret = append(ret, c)
142                 }
143         }
144         return
145 }
146
147 func (t *Torrent) addPeer(p Peer) {
148         cl := t.cl
149         cl.openNewConns(t)
150         if len(t.peers) >= torrentPeersHighWater {
151                 return
152         }
153         key := peersKey{string(p.IP), p.Port}
154         if _, ok := t.peers[key]; ok {
155                 return
156         }
157         t.peers[key] = p
158         peersAddedBySource.Add(string(p.Source), 1)
159         cl.openNewConns(t)
160
161 }
162
163 func (t *Torrent) invalidateMetadata() {
164         t.metadataBytes = nil
165         t.metadataCompletedChunks = nil
166         t.info = nil
167 }
168
169 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
170         if t.haveInfo() {
171                 return
172         }
173         if index >= len(t.metadataCompletedChunks) {
174                 log.Printf("%s: ignoring metadata piece %d", t, index)
175                 return
176         }
177         copy(t.metadataBytes[(1<<14)*index:], data)
178         t.metadataCompletedChunks[index] = true
179 }
180
181 func (t *Torrent) metadataPieceCount() int {
182         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
183 }
184
185 func (t *Torrent) haveMetadataPiece(piece int) bool {
186         if t.haveInfo() {
187                 return (1<<14)*piece < len(t.metadataBytes)
188         } else {
189                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
190         }
191 }
192
193 func (t *Torrent) metadataSizeKnown() bool {
194         return t.metadataBytes != nil
195 }
196
197 func (t *Torrent) metadataSize() int {
198         return len(t.metadataBytes)
199 }
200
201 func infoPieceHashes(info *metainfo.Info) (ret []string) {
202         for i := 0; i < len(info.Pieces); i += 20 {
203                 ret = append(ret, string(info.Pieces[i:i+20]))
204         }
205         return
206 }
207
208 // Called when metadata for a torrent becomes available.
209 func (t *Torrent) setInfoBytes(b []byte) error {
210         if t.haveInfo() {
211                 return nil
212         }
213         var ie *metainfo.InfoEx
214         err := bencode.Unmarshal(b, &ie)
215         if err != nil {
216                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
217         }
218         if ie.Hash() != t.infoHash {
219                 return errors.New("info bytes have wrong hash")
220         }
221         err = validateInfo(&ie.Info)
222         if err != nil {
223                 return fmt.Errorf("bad info: %s", err)
224         }
225         defer t.updateWantPeersEvent()
226         t.info = ie
227         t.displayName = "" // Save a few bytes lol.
228         t.cl.event.Broadcast()
229         t.gotMetainfo.Set()
230         t.storage, err = t.storageOpener.OpenTorrent(t.info)
231         if err != nil {
232                 return fmt.Errorf("error opening torrent storage: %s", err)
233         }
234         t.length = 0
235         for _, f := range t.info.UpvertedFiles() {
236                 t.length += f.Length
237         }
238         t.metadataBytes = b
239         t.metadataCompletedChunks = nil
240         hashes := infoPieceHashes(&t.info.Info)
241         t.pieces = make([]piece, len(hashes))
242         for i, hash := range hashes {
243                 piece := &t.pieces[i]
244                 piece.t = t
245                 piece.index = i
246                 piece.noPendingWrites.L = &piece.pendingWritesMutex
247                 missinggo.CopyExact(piece.Hash[:], hash)
248         }
249         for _, conn := range t.conns {
250                 if err := conn.setNumPieces(t.numPieces()); err != nil {
251                         log.Printf("closing connection: %s", err)
252                         conn.Close()
253                 }
254         }
255         for i := range t.pieces {
256                 t.updatePieceCompletion(i)
257                 t.pieces[i].QueuedForHash = true
258         }
259         go func() {
260                 for i := range t.pieces {
261                         t.verifyPiece(i)
262                 }
263         }()
264         return nil
265 }
266
267 func (t *Torrent) verifyPiece(piece int) {
268         t.cl.verifyPiece(t, piece)
269 }
270
271 func (t *Torrent) haveAllMetadataPieces() bool {
272         if t.haveInfo() {
273                 return true
274         }
275         if t.metadataCompletedChunks == nil {
276                 return false
277         }
278         for _, have := range t.metadataCompletedChunks {
279                 if !have {
280                         return false
281                 }
282         }
283         return true
284 }
285
286 // TODO: Propagate errors to disconnect peer.
287 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
288         if t.haveInfo() {
289                 // We already know the correct metadata size.
290                 return
291         }
292         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
293                 return errors.New("bad size")
294         }
295         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
296                 return
297         }
298         t.metadataBytes = make([]byte, bytes)
299         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
300         for _, c := range t.conns {
301                 c.requestPendingMetadata()
302         }
303         return
304 }
305
306 // The current working name for the torrent. Either the name in the info dict,
307 // or a display name given such as by the dn value in a magnet link, or "".
308 func (t *Torrent) name() string {
309         if t.haveInfo() {
310                 return t.info.Name
311         }
312         return t.displayName
313 }
314
315 func (t *Torrent) pieceState(index int) (ret PieceState) {
316         p := &t.pieces[index]
317         ret.Priority = t.piecePriority(index)
318         if t.pieceComplete(index) {
319                 ret.Complete = true
320         }
321         if p.QueuedForHash || p.Hashing {
322                 ret.Checking = true
323         }
324         if !ret.Complete && t.piecePartiallyDownloaded(index) {
325                 ret.Partial = true
326         }
327         return
328 }
329
330 func (t *Torrent) metadataPieceSize(piece int) int {
331         return metadataPieceSize(len(t.metadataBytes), piece)
332 }
333
334 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
335         d := map[string]int{
336                 "msg_type": msgType,
337                 "piece":    piece,
338         }
339         if data != nil {
340                 d["total_size"] = len(t.metadataBytes)
341         }
342         p, err := bencode.Marshal(d)
343         if err != nil {
344                 panic(err)
345         }
346         return pp.Message{
347                 Type:            pp.Extended,
348                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
349                 ExtendedPayload: append(p, data...),
350         }
351 }
352
353 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
354         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
355                 ret = append(ret, PieceStateRun{
356                         PieceState: el.(PieceState),
357                         Length:     int(count),
358                 })
359         })
360         for index := range t.pieces {
361                 rle.Append(t.pieceState(index), 1)
362         }
363         rle.Flush()
364         return
365 }
366
367 // Produces a small string representing a PieceStateRun.
368 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
369         ret = fmt.Sprintf("%d", psr.Length)
370         ret += func() string {
371                 switch psr.Priority {
372                 case PiecePriorityNext:
373                         return "N"
374                 case PiecePriorityNormal:
375                         return "."
376                 case PiecePriorityReadahead:
377                         return "R"
378                 case PiecePriorityNow:
379                         return "!"
380                 default:
381                         return ""
382                 }
383         }()
384         if psr.Checking {
385                 ret += "H"
386         }
387         if psr.Partial {
388                 ret += "P"
389         }
390         if psr.Complete {
391                 ret += "C"
392         }
393         return
394 }
395
396 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
397         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
398         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
399         if !t.haveInfo() {
400                 fmt.Fprintf(w, "Metadata have: ")
401                 for _, h := range t.metadataCompletedChunks {
402                         fmt.Fprintf(w, "%c", func() rune {
403                                 if h {
404                                         return 'H'
405                                 } else {
406                                         return '.'
407                                 }
408                         }())
409                 }
410                 fmt.Fprintln(w)
411         }
412         fmt.Fprintf(w, "Piece length: %s\n", func() string {
413                 if t.haveInfo() {
414                         return fmt.Sprint(t.usualPieceSize())
415                 } else {
416                         return "?"
417                 }
418         }())
419         if t.haveInfo() {
420                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
421                 fmt.Fprint(w, "Piece States:")
422                 for _, psr := range t.pieceStateRuns() {
423                         w.Write([]byte(" "))
424                         w.Write([]byte(pieceStateRunStatusChars(psr)))
425                 }
426                 fmt.Fprintln(w)
427         }
428         fmt.Fprintf(w, "Reader Pieces:")
429         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
430                 fmt.Fprintf(w, " %d:%d", begin, end)
431                 return true
432         })
433         fmt.Fprintln(w)
434
435         fmt.Fprintf(w, "Trackers: ")
436         for _url := range t.trackerAnnouncers {
437                 fmt.Fprintf(w, "%q ", _url)
438         }
439         fmt.Fprintf(w, "\n")
440
441         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
442
443         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
444         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
445         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
446         slices.Sort(t.conns, worseConn)
447         for i, c := range t.conns {
448                 fmt.Fprintf(w, "%2d. ", i+1)
449                 c.WriteStatus(w, t)
450         }
451 }
452
453 func (t *Torrent) haveInfo() bool {
454         return t.info != nil
455 }
456
457 // TODO: Include URIs that weren't converted to tracker clients.
458 func (t *Torrent) announceList() (al [][]string) {
459         return t.metainfo.AnnounceList
460 }
461
462 // Returns a run-time generated MetaInfo that includes the info bytes and
463 // announce-list as currently known to the client.
464 func (t *Torrent) newMetaInfo() (mi *metainfo.MetaInfo) {
465         mi = &metainfo.MetaInfo{
466                 CreationDate: time.Now().Unix(),
467                 Comment:      "dynamic metainfo from client",
468                 CreatedBy:    "go.torrent",
469                 AnnounceList: t.announceList(),
470         }
471         if t.info != nil {
472                 mi.Info = *t.info
473         }
474         return
475 }
476
477 func (t *Torrent) bytesLeft() (left int64) {
478         for i := 0; i < t.numPieces(); i++ {
479                 left += int64(t.pieces[i].bytesLeft())
480         }
481         return
482 }
483
484 // Bytes left to give in tracker announces.
485 func (t *Torrent) bytesLeftAnnounce() uint64 {
486         if t.haveInfo() {
487                 return uint64(t.bytesLeft())
488         } else {
489                 return math.MaxUint64
490         }
491 }
492
493 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
494         if t.pieceComplete(piece) {
495                 return false
496         }
497         if t.pieceAllDirty(piece) {
498                 return false
499         }
500         return t.pieces[piece].hasDirtyChunks()
501 }
502
503 func (t *Torrent) usualPieceSize() int {
504         return int(t.info.PieceLength)
505 }
506
507 func (t *Torrent) lastPieceSize() int {
508         return int(t.pieceLength(t.numPieces() - 1))
509 }
510
511 func (t *Torrent) numPieces() int {
512         return t.info.NumPieces()
513 }
514
515 func (t *Torrent) numPiecesCompleted() (num int) {
516         return t.completedPieces.Len()
517 }
518
519 func (t *Torrent) close() (err error) {
520         t.closed.Set()
521         if c, ok := t.storage.(io.Closer); ok {
522                 c.Close()
523         }
524         for _, conn := range t.conns {
525                 conn.Close()
526         }
527         t.pieceStateChanges.Close()
528         t.updateWantPeersEvent()
529         return
530 }
531
532 func (t *Torrent) requestOffset(r request) int64 {
533         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
534 }
535
536 // Return the request that would include the given offset into the torrent
537 // data. Returns !ok if there is no such request.
538 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
539         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
540 }
541
542 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
543         tr := perf.NewTimer()
544
545         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
546         if err == nil && n != len(data) {
547                 err = io.ErrShortWrite
548         }
549         if err == nil {
550                 tr.Stop("write chunk")
551         }
552         return
553 }
554
555 func (t *Torrent) bitfield() (bf []bool) {
556         bf = make([]bool, t.numPieces())
557         t.completedPieces.IterTyped(func(piece int) (again bool) {
558                 bf[piece] = true
559                 return true
560         })
561         return
562 }
563
564 func (t *Torrent) validOutgoingRequest(r request) bool {
565         if r.Index >= pp.Integer(t.info.NumPieces()) {
566                 return false
567         }
568         if r.Begin%t.chunkSize != 0 {
569                 return false
570         }
571         if r.Length > t.chunkSize {
572                 return false
573         }
574         pieceLength := t.pieceLength(int(r.Index))
575         if r.Begin+r.Length > pieceLength {
576                 return false
577         }
578         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
579 }
580
581 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
582         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
583         var cs chunkSpec
584         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
585                 cs.Length = left
586                 if cs.Length > t.chunkSize {
587                         cs.Length = t.chunkSize
588                 }
589                 css = append(css, cs)
590                 cs.Begin += cs.Length
591         }
592         return
593 }
594
595 func (t *Torrent) pieceNumChunks(piece int) int {
596         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
597 }
598
599 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
600         t.pieces[pieceIndex].DirtyChunks.Clear()
601 }
602
603 type Peer struct {
604         Id     [20]byte
605         IP     net.IP
606         Port   int
607         Source peerSource
608         // Peer is known to support encryption.
609         SupportsEncryption bool
610 }
611
612 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
613         if piece < 0 || piece >= t.info.NumPieces() {
614                 return
615         }
616         if piece == t.numPieces()-1 {
617                 len_ = pp.Integer(t.length % t.info.PieceLength)
618         }
619         if len_ == 0 {
620                 len_ = pp.Integer(t.info.PieceLength)
621         }
622         return
623 }
624
625 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
626         hash := pieceHash.New()
627         p := &t.pieces[piece]
628         p.waitNoPendingWrites()
629         ip := t.info.Piece(piece)
630         pl := ip.Length()
631         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
632         if n == pl {
633                 missinggo.CopyExact(&ret, hash.Sum(nil))
634                 return
635         }
636         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
637                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
638         }
639         return
640 }
641
642 func (t *Torrent) haveAllPieces() bool {
643         if !t.haveInfo() {
644                 return false
645         }
646         return t.completedPieces.Len() == t.numPieces()
647 }
648
649 func (t *Torrent) haveAnyPieces() bool {
650         for i := range t.pieces {
651                 if t.pieceComplete(i) {
652                         return true
653                 }
654         }
655         return false
656 }
657
658 func (t *Torrent) havePiece(index int) bool {
659         return t.haveInfo() && t.pieceComplete(index)
660 }
661
662 func (t *Torrent) haveChunk(r request) (ret bool) {
663         // defer func() {
664         //      log.Println("have chunk", r, ret)
665         // }()
666         if !t.haveInfo() {
667                 return false
668         }
669         if t.pieceComplete(int(r.Index)) {
670                 return true
671         }
672         p := &t.pieces[r.Index]
673         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
674 }
675
676 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
677         return int(cs.Begin / chunkSize)
678 }
679
680 func (t *Torrent) wantPiece(r request) bool {
681         if !t.wantPieceIndex(int(r.Index)) {
682                 return false
683         }
684         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
685                 return true
686         }
687         // TODO: What about pieces that were wanted, but aren't now, and aren't
688         // completed either? That used to be done here.
689         return false
690 }
691
692 func (t *Torrent) wantPieceIndex(index int) bool {
693         if !t.haveInfo() {
694                 return false
695         }
696         p := &t.pieces[index]
697         if p.QueuedForHash {
698                 return false
699         }
700         if p.Hashing {
701                 return false
702         }
703         if t.pieceComplete(index) {
704                 return false
705         }
706         if t.pendingPieces.Contains(index) {
707                 return true
708         }
709         return !t.forReaderOffsetPieces(func(begin, end int) bool {
710                 return index < begin || index >= end
711         })
712 }
713
714 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
715         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
716                 for i := begin; begin < end; i++ {
717                         if !f(i) {
718                                 return false
719                         }
720                 }
721                 return true
722         })
723 }
724
725 func (t *Torrent) connHasWantedPieces(c *connection) bool {
726         return !c.pieceRequestOrder.IsEmpty()
727 }
728
729 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
730         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
731                 pieces = append(pieces, int(i))
732         }
733         return
734 }
735
736 // The worst connection is one that hasn't been sent, or sent anything useful
737 // for the longest. A bad connection is one that usually sends us unwanted
738 // pieces, or has been in worser half of the established connections for more
739 // than a minute.
740 func (t *Torrent) worstBadConn() *connection {
741         wcs := slices.AsHeap(t.worstUnclosedConns(), worseConn)
742         for wcs.Len() != 0 {
743                 c := heap.Pop(wcs).(*connection)
744                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
745                         return c
746                 }
747                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
748                         // Give connections 1 minute to prove themselves.
749                         if time.Since(c.completedHandshake) > time.Minute {
750                                 return c
751                         }
752                 }
753         }
754         return nil
755 }
756
757 type PieceStateChange struct {
758         Index int
759         PieceState
760 }
761
762 func (t *Torrent) publishPieceChange(piece int) {
763         cur := t.pieceState(piece)
764         p := &t.pieces[piece]
765         if cur != p.PublicPieceState {
766                 p.PublicPieceState = cur
767                 t.pieceStateChanges.Publish(PieceStateChange{
768                         piece,
769                         cur,
770                 })
771         }
772 }
773
774 func (t *Torrent) pieceNumPendingChunks(piece int) int {
775         if t.pieceComplete(piece) {
776                 return 0
777         }
778         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
779 }
780
781 func (t *Torrent) pieceAllDirty(piece int) bool {
782         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
783 }
784
785 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
786         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
787                 if begin < end {
788                         if !f(begin) {
789                                 return false
790                         }
791                 }
792                 return true
793         })
794 }
795
796 func (t *Torrent) readersChanged() {
797         t.updatePiecePriorities()
798 }
799
800 func (t *Torrent) maybeNewConns() {
801         // Tickle the accept routine.
802         t.cl.event.Broadcast()
803         t.openNewConns()
804 }
805
806 func (t *Torrent) piecePriorityChanged(piece int) {
807         for _, c := range t.conns {
808                 c.updatePiecePriority(piece)
809         }
810         t.maybeNewConns()
811         t.publishPieceChange(piece)
812 }
813
814 func (t *Torrent) updatePiecePriority(piece int) bool {
815         p := &t.pieces[piece]
816         newPrio := t.piecePriorityUncached(piece)
817         if newPrio == p.priority {
818                 return false
819         }
820         p.priority = newPrio
821         return true
822 }
823
824 // Update all piece priorities in one hit. This function should have the same
825 // output as updatePiecePriority, but across all pieces.
826 func (t *Torrent) updatePiecePriorities() {
827         newPrios := make([]piecePriority, t.numPieces())
828         t.pendingPieces.IterTyped(func(piece int) (more bool) {
829                 newPrios[piece] = PiecePriorityNormal
830                 return true
831         })
832         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
833                 if begin < end {
834                         newPrios[begin].Raise(PiecePriorityNow)
835                 }
836                 for i := begin + 1; i < end; i++ {
837                         newPrios[i].Raise(PiecePriorityReadahead)
838                 }
839                 return true
840         })
841         t.completedPieces.IterTyped(func(piece int) (more bool) {
842                 newPrios[piece] = PiecePriorityNone
843                 return true
844         })
845         for i, prio := range newPrios {
846                 if prio != t.pieces[i].priority {
847                         t.pieces[i].priority = prio
848                         t.piecePriorityChanged(i)
849                 }
850         }
851 }
852
853 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
854         if off >= t.length {
855                 return
856         }
857         if off < 0 {
858                 size += off
859                 off = 0
860         }
861         if size <= 0 {
862                 return
863         }
864         begin = int(off / t.info.PieceLength)
865         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
866         if end > t.info.NumPieces() {
867                 end = t.info.NumPieces()
868         }
869         return
870 }
871
872 // Returns true if all iterations complete without breaking.
873 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
874         // There's an oppurtunity here to build a map of beginning pieces, and a
875         // bitmap of the rest. I wonder if it's worth the allocation overhead.
876         for r := range t.readers {
877                 r.mu.Lock()
878                 pos, readahead := r.pos, r.readahead
879                 r.mu.Unlock()
880                 if readahead < 1 {
881                         readahead = 1
882                 }
883                 begin, end := t.byteRegionPieces(pos, readahead)
884                 if begin >= end {
885                         continue
886                 }
887                 if !f(begin, end) {
888                         return false
889                 }
890         }
891         return true
892 }
893
894 func (t *Torrent) piecePriority(piece int) piecePriority {
895         if !t.haveInfo() {
896                 return PiecePriorityNone
897         }
898         return t.pieces[piece].priority
899 }
900
901 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
902         ret = PiecePriorityNone
903         if t.pieceComplete(piece) {
904                 return
905         }
906         if t.pendingPieces.Contains(piece) {
907                 ret = PiecePriorityNormal
908         }
909         raiseRet := ret.Raise
910         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
911                 if piece == begin {
912                         raiseRet(PiecePriorityNow)
913                 }
914                 if begin <= piece && piece < end {
915                         raiseRet(PiecePriorityReadahead)
916                 }
917                 return true
918         })
919         return
920 }
921
922 func (t *Torrent) pendPiece(piece int) {
923         if t.pendingPieces.Contains(piece) {
924                 return
925         }
926         if t.havePiece(piece) {
927                 return
928         }
929         t.pendingPieces.Add(piece)
930         if !t.updatePiecePriority(piece) {
931                 return
932         }
933         t.piecePriorityChanged(piece)
934 }
935
936 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
937         return t.completedPieces.Copy()
938 }
939
940 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
941         t.pendingPieces.Sub(unpend)
942         t.updatePiecePriorities()
943 }
944
945 func (t *Torrent) pendPieceRange(begin, end int) {
946         for i := begin; i < end; i++ {
947                 t.pendPiece(i)
948         }
949 }
950
951 func (t *Torrent) unpendPieceRange(begin, end int) {
952         var bm bitmap.Bitmap
953         bm.AddRange(begin, end)
954         t.unpendPieces(&bm)
955 }
956
957 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
958         if !c.PeerHasPiece(piece) {
959                 return true
960         }
961         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
962         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
963                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
964                 return c.Request(req)
965         })
966 }
967
968 func (t *Torrent) pendRequest(req request) {
969         ci := chunkIndex(req.chunkSpec, t.chunkSize)
970         t.pieces[req.Index].pendChunkIndex(ci)
971 }
972
973 func (t *Torrent) pieceChanged(piece int) {
974         t.cl.pieceChanged(t, piece)
975 }
976
977 func (t *Torrent) openNewConns() {
978         t.cl.openNewConns(t)
979 }
980
981 func (t *Torrent) getConnPieceInclination() []int {
982         _ret := t.connPieceInclinationPool.Get()
983         if _ret == nil {
984                 pieceInclinationsNew.Add(1)
985                 return rand.Perm(t.numPieces())
986         }
987         pieceInclinationsReused.Add(1)
988         return _ret.([]int)
989 }
990
991 func (t *Torrent) putPieceInclination(pi []int) {
992         t.connPieceInclinationPool.Put(pi)
993         pieceInclinationsPut.Add(1)
994 }
995
996 func (t *Torrent) updatePieceCompletion(piece int) {
997         pcu := t.pieceCompleteUncached(piece)
998         changed := t.completedPieces.Get(piece) != pcu
999         t.completedPieces.Set(piece, pcu)
1000         if changed {
1001                 t.pieceChanged(piece)
1002         }
1003 }
1004
1005 // Non-blocking read. Client lock is not required.
1006 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1007         p := &t.pieces[off/t.info.PieceLength]
1008         p.waitNoPendingWrites()
1009         return p.Storage().ReadAt(b, off-p.Info().Offset())
1010 }
1011
1012 func (t *Torrent) updateAllPieceCompletions() {
1013         for i := range iter.N(t.numPieces()) {
1014                 t.updatePieceCompletion(i)
1015         }
1016 }
1017
1018 func (t *Torrent) maybeMetadataCompleted() {
1019         if t.haveInfo() {
1020                 // Nothing to do.
1021                 return
1022         }
1023         if !t.haveAllMetadataPieces() {
1024                 // Don't have enough metadata pieces.
1025                 return
1026         }
1027         // TODO(anacrolix): If this fails, I think something harsher should be
1028         // done.
1029         err := t.setInfoBytes(t.metadataBytes)
1030         if err != nil {
1031                 log.Printf("error setting metadata: %s", err)
1032                 t.invalidateMetadata()
1033                 return
1034         }
1035         if t.cl.config.Debug {
1036                 log.Printf("%s: got metadata from peers", t)
1037         }
1038 }
1039
1040 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1041         t.forReaderOffsetPieces(func(begin, end int) bool {
1042                 ret.AddRange(begin, end)
1043                 return true
1044         })
1045         return
1046 }
1047
1048 func (t *Torrent) needData() bool {
1049         if !t.haveInfo() {
1050                 return true
1051         }
1052         if t.pendingPieces.Len() != 0 {
1053                 return true
1054         }
1055         return !t.readerPieces().IterTyped(func(piece int) bool {
1056                 return t.pieceComplete(piece)
1057         })
1058 }
1059
1060 func appendMissingStrings(old, new []string) (ret []string) {
1061         ret = old
1062 new:
1063         for _, n := range new {
1064                 for _, o := range old {
1065                         if o == n {
1066                                 continue new
1067                         }
1068                 }
1069                 ret = append(ret, n)
1070         }
1071         return
1072 }
1073
1074 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1075         ret = existing
1076         for minNumTiers > len(ret) {
1077                 ret = append(ret, nil)
1078         }
1079         return
1080 }
1081
1082 func (t *Torrent) addTrackers(announceList [][]string) {
1083         fullAnnounceList := &t.metainfo.AnnounceList
1084         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1085         for tierIndex, trackerURLs := range announceList {
1086                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1087         }
1088         t.startMissingTrackerScrapers()
1089         t.updateWantPeersEvent()
1090 }
1091
1092 // Don't call this before the info is available.
1093 func (t *Torrent) bytesCompleted() int64 {
1094         if !t.haveInfo() {
1095                 return 0
1096         }
1097         return t.info.TotalLength() - t.bytesLeft()
1098 }
1099
1100 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1101         t.cl.mu.Lock()
1102         defer t.cl.mu.Unlock()
1103         return t.setInfoBytes(b)
1104 }
1105
1106 // Returns true if connection is removed from torrent.Conns.
1107 func (t *Torrent) deleteConnection(c *connection) bool {
1108         for i0, _c := range t.conns {
1109                 if _c != c {
1110                         continue
1111                 }
1112                 i1 := len(t.conns) - 1
1113                 if i0 != i1 {
1114                         t.conns[i0] = t.conns[i1]
1115                 }
1116                 t.conns = t.conns[:i1]
1117                 return true
1118         }
1119         return false
1120 }
1121
1122 func (t *Torrent) dropConnection(c *connection) {
1123         t.cl.event.Broadcast()
1124         c.Close()
1125         if t.deleteConnection(c) {
1126                 t.openNewConns()
1127         }
1128 }
1129
1130 func (t *Torrent) wantPeers() bool {
1131         if t.closed.IsSet() {
1132                 return false
1133         }
1134         if len(t.peers) > torrentPeersLowWater {
1135                 return false
1136         }
1137         return t.needData() || t.seeding()
1138 }
1139
1140 func (t *Torrent) updateWantPeersEvent() {
1141         if t.wantPeers() {
1142                 t.wantPeersEvent.Set()
1143         } else {
1144                 t.wantPeersEvent.Clear()
1145         }
1146 }
1147
1148 // Returns whether the client should make effort to seed the torrent.
1149 func (t *Torrent) seeding() bool {
1150         cl := t.cl
1151         if cl.config.NoUpload {
1152                 return false
1153         }
1154         if !cl.config.Seed {
1155                 return false
1156         }
1157         if t.needData() {
1158                 return false
1159         }
1160         return true
1161 }
1162
1163 // Adds and starts tracker scrapers for tracker URLs that aren't already
1164 // running.
1165 func (t *Torrent) startMissingTrackerScrapers() {
1166         if t.cl.config.DisableTrackers {
1167                 return
1168         }
1169         for _, tier := range t.announceList() {
1170                 for _, trackerURL := range tier {
1171                         if _, ok := t.trackerAnnouncers[trackerURL]; ok {
1172                                 continue
1173                         }
1174                         newAnnouncer := &trackerScraper{
1175                                 url: trackerURL,
1176                                 t:   t,
1177                         }
1178                         if t.trackerAnnouncers == nil {
1179                                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1180                         }
1181                         t.trackerAnnouncers[trackerURL] = newAnnouncer
1182                         go newAnnouncer.Run()
1183                 }
1184         }
1185 }
1186
1187 // Returns an AnnounceRequest with fields filled out to defaults and current
1188 // values.
1189 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1190         return tracker.AnnounceRequest{
1191                 Event:    tracker.None,
1192                 NumWant:  -1,
1193                 Port:     uint16(t.cl.incomingPeerPort()),
1194                 PeerId:   t.cl.peerID,
1195                 InfoHash: t.infoHash,
1196                 Left:     t.bytesLeftAnnounce(),
1197         }
1198 }
1199
1200 func (t *Torrent) announceDHT(impliedPort bool) {
1201         cl := t.cl
1202         for {
1203                 select {
1204                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1205                 case <-t.closed.LockedChan(&cl.mu):
1206                         return
1207                 }
1208                 // log.Printf("getting peers for %q from DHT", t)
1209                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1210                 if err != nil {
1211                         log.Printf("error getting peers from dht: %s", err)
1212                         return
1213                 }
1214                 cl.mu.Lock()
1215                 t.numDHTAnnounces++
1216                 cl.mu.Unlock()
1217                 // Count all the unique addresses we got during this announce.
1218                 allAddrs := make(map[string]struct{})
1219         getPeers:
1220                 for {
1221                         select {
1222                         case v, ok := <-ps.Peers:
1223                                 if !ok {
1224                                         break getPeers
1225                                 }
1226                                 addPeers := make([]Peer, 0, len(v.Peers))
1227                                 for _, cp := range v.Peers {
1228                                         if cp.Port == 0 {
1229                                                 // Can't do anything with this.
1230                                                 continue
1231                                         }
1232                                         addPeers = append(addPeers, Peer{
1233                                                 IP:     cp.IP[:],
1234                                                 Port:   cp.Port,
1235                                                 Source: peerSourceDHT,
1236                                         })
1237                                         key := (&net.UDPAddr{
1238                                                 IP:   cp.IP[:],
1239                                                 Port: cp.Port,
1240                                         }).String()
1241                                         allAddrs[key] = struct{}{}
1242                                 }
1243                                 cl.mu.Lock()
1244                                 t.addPeers(addPeers)
1245                                 numPeers := len(t.peers)
1246                                 cl.mu.Unlock()
1247                                 if numPeers >= torrentPeersHighWater {
1248                                         break getPeers
1249                                 }
1250                         case <-t.closed.LockedChan(&cl.mu):
1251                                 ps.Close()
1252                                 return
1253                         }
1254                 }
1255                 ps.Close()
1256                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1257         }
1258 }
1259
1260 func (t *Torrent) addPeers(peers []Peer) {
1261         for _, p := range peers {
1262                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1263                         continue
1264                 }
1265                 t.addPeer(p)
1266         }
1267 }
1268
1269 func (t *Torrent) Stats() TorrentStats {
1270         return t.stats
1271 }
1272
1273 // Returns true if the connection is added.
1274 func (t *Torrent) addConnection(c *connection) bool {
1275         if t.cl.closed.IsSet() {
1276                 return false
1277         }
1278         if !t.wantConns() {
1279                 return false
1280         }
1281         for _, c0 := range t.conns {
1282                 if c.PeerID == c0.PeerID {
1283                         // Already connected to a client with that ID.
1284                         duplicateClientConns.Add(1)
1285                         return false
1286                 }
1287         }
1288         if len(t.conns) >= t.maxEstablishedConns {
1289                 c := t.worstBadConn()
1290                 if c == nil {
1291                         return false
1292                 }
1293                 if t.cl.config.Debug && missinggo.CryHeard() {
1294                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1295                 }
1296                 c.Close()
1297                 t.deleteConnection(c)
1298         }
1299         if len(t.conns) >= t.maxEstablishedConns {
1300                 panic(len(t.conns))
1301         }
1302         t.conns = append(t.conns, c)
1303         c.t = t
1304         return true
1305 }
1306
1307 func (t *Torrent) wantConns() bool {
1308         if !t.seeding() && !t.needData() {
1309                 return false
1310         }
1311         if len(t.conns) < t.maxEstablishedConns {
1312                 return true
1313         }
1314         return t.worstBadConn() != nil
1315 }
1316
1317 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1318         t.cl.mu.Lock()
1319         defer t.cl.mu.Unlock()
1320         oldMax = t.maxEstablishedConns
1321         t.maxEstablishedConns = max
1322         wcs := slices.AsHeap(append([]*connection(nil), t.conns...), worseConn)
1323         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1324                 t.dropConnection(wcs.Pop().(*connection))
1325         }
1326         t.openNewConns()
1327         return oldMax
1328 }