]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Quick fix for missing MetaInfo.Announce everywhere with trackers
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "sync"
15         "text/tabwriter"
16         "time"
17
18         "github.com/anacrolix/dht"
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/bitmap"
21         "github.com/anacrolix/missinggo/perf"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/bradfitz/iter"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/metainfo"
28         pp "github.com/anacrolix/torrent/peer_protocol"
29         "github.com/anacrolix/torrent/storage"
30         "github.com/anacrolix/torrent/tracker"
31 )
32
33 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
34         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
35 }
36
37 type peersKey struct {
38         IPBytes string
39         Port    int
40 }
41
42 // Maintains state of torrent within a Client.
43 type Torrent struct {
44         cl *Client
45
46         closed   missinggo.Event
47         infoHash metainfo.Hash
48         pieces   []piece
49         // Values are the piece indices that changed.
50         pieceStateChanges *pubsub.PubSub
51         // The size of chunks to request from peers over the wire. This is
52         // normally 16KiB by convention these days.
53         chunkSize pp.Integer
54         chunkPool *sync.Pool
55         // Total length of the torrent in bytes. Stored because it's not O(1) to
56         // get this from the info dict.
57         length int64
58
59         // The storage to open when the info dict becomes available.
60         storageOpener *storage.Client
61         // Storage for torrent data.
62         storage *storage.Torrent
63
64         metainfo metainfo.MetaInfo
65
66         // The info dict. nil if we don't have it (yet).
67         info *metainfo.Info
68         // Active peer connections, running message stream loops.
69         conns               map[*connection]struct{}
70         maxEstablishedConns int
71         // Set of addrs to which we're attempting to connect. Connections are
72         // half-open until all handshakes are completed.
73         halfOpen map[string]struct{}
74
75         // Reserve of peers to connect to. A peer can be both here and in the
76         // active connections if were told about the peer after connecting with
77         // them. That encourages us to reconnect to peers that are well known in
78         // the swarm.
79         peers          map[peersKey]Peer
80         wantPeersEvent missinggo.Event
81         // An announcer for each tracker URL.
82         trackerAnnouncers map[string]*trackerScraper
83         // How many times we've initiated a DHT announce. TODO: Move into stats.
84         numDHTAnnounces int
85
86         // Name used if the info name isn't available. Should be cleared when the
87         // Info does become available.
88         displayName string
89         // The bencoded bytes of the info dict. This is actively manipulated if
90         // the info bytes aren't initially available, and we try to fetch them
91         // from peers.
92         metadataBytes []byte
93         // Each element corresponds to the 16KiB metadata pieces. If true, we have
94         // received that piece.
95         metadataCompletedChunks []bool
96
97         // Set when .Info is obtained.
98         gotMetainfo missinggo.Event
99
100         readers               map[*Reader]struct{}
101         readerNowPieces       bitmap.Bitmap
102         readerReadaheadPieces bitmap.Bitmap
103
104         // The indexes of pieces we want with normal priority, that aren't
105         // currently available.
106         pendingPieces bitmap.Bitmap
107         // A cache of completed piece indices.
108         completedPieces bitmap.Bitmap
109
110         // A pool of piece priorities []int for assignment to new connections.
111         // These "inclinations" are used to give connections preference for
112         // different pieces.
113         connPieceInclinationPool sync.Pool
114         // Torrent-level statistics.
115         stats TorrentStats
116 }
117
118 // Returns a channel that is closed when the Torrent is closed.
119 func (t *Torrent) Closed() <-chan struct{} {
120         return t.closed.LockedChan(&t.cl.mu)
121 }
122
123 func (t *Torrent) setChunkSize(size pp.Integer) {
124         t.chunkSize = size
125         t.chunkPool = &sync.Pool{
126                 New: func() interface{} {
127                         return make([]byte, size)
128                 },
129         }
130 }
131
132 func (t *Torrent) setDisplayName(dn string) {
133         if t.haveInfo() {
134                 return
135         }
136         t.displayName = dn
137 }
138
139 func (t *Torrent) pieceComplete(piece int) bool {
140         return t.completedPieces.Get(piece)
141 }
142
143 func (t *Torrent) pieceCompleteUncached(piece int) bool {
144         return t.pieces[piece].Storage().GetIsComplete()
145 }
146
147 // There's a connection to that address already.
148 func (t *Torrent) addrActive(addr string) bool {
149         if _, ok := t.halfOpen[addr]; ok {
150                 return true
151         }
152         for c := range t.conns {
153                 if c.remoteAddr().String() == addr {
154                         return true
155                 }
156         }
157         return false
158 }
159
160 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
161         ret = make([]*connection, 0, len(t.conns))
162         for c := range t.conns {
163                 if !c.closed.IsSet() {
164                         ret = append(ret, c)
165                 }
166         }
167         return
168 }
169
170 func (t *Torrent) addPeer(p Peer) {
171         cl := t.cl
172         cl.openNewConns(t)
173         if len(t.peers) >= torrentPeersHighWater {
174                 return
175         }
176         key := peersKey{string(p.IP), p.Port}
177         if _, ok := t.peers[key]; ok {
178                 return
179         }
180         t.peers[key] = p
181         peersAddedBySource.Add(string(p.Source), 1)
182         cl.openNewConns(t)
183
184 }
185
186 func (t *Torrent) invalidateMetadata() {
187         for i := range t.metadataCompletedChunks {
188                 t.metadataCompletedChunks[i] = false
189         }
190         t.info = nil
191 }
192
193 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
194         if t.haveInfo() {
195                 return
196         }
197         if index >= len(t.metadataCompletedChunks) {
198                 log.Printf("%s: ignoring metadata piece %d", t, index)
199                 return
200         }
201         copy(t.metadataBytes[(1<<14)*index:], data)
202         t.metadataCompletedChunks[index] = true
203 }
204
205 func (t *Torrent) metadataPieceCount() int {
206         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
207 }
208
209 func (t *Torrent) haveMetadataPiece(piece int) bool {
210         if t.haveInfo() {
211                 return (1<<14)*piece < len(t.metadataBytes)
212         } else {
213                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
214         }
215 }
216
217 func (t *Torrent) metadataSizeKnown() bool {
218         return t.metadataBytes != nil
219 }
220
221 func (t *Torrent) metadataSize() int {
222         return len(t.metadataBytes)
223 }
224
225 func infoPieceHashes(info *metainfo.Info) (ret []string) {
226         for i := 0; i < len(info.Pieces); i += sha1.Size {
227                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
228         }
229         return
230 }
231
232 func (t *Torrent) makePieces() {
233         hashes := infoPieceHashes(t.info)
234         t.pieces = make([]piece, len(hashes))
235         for i, hash := range hashes {
236                 piece := &t.pieces[i]
237                 piece.t = t
238                 piece.index = i
239                 piece.noPendingWrites.L = &piece.pendingWritesMutex
240                 missinggo.CopyExact(piece.Hash[:], hash)
241         }
242 }
243
244 // Called when metadata for a torrent becomes available.
245 func (t *Torrent) setInfoBytes(b []byte) error {
246         if t.haveInfo() {
247                 return nil
248         }
249         if metainfo.HashBytes(b) != t.infoHash {
250                 return errors.New("info bytes have wrong hash")
251         }
252         var info metainfo.Info
253         err := bencode.Unmarshal(b, &info)
254         if err != nil {
255                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
256         }
257         err = validateInfo(&info)
258         if err != nil {
259                 return fmt.Errorf("bad info: %s", err)
260         }
261         defer t.updateWantPeersEvent()
262         t.info = &info
263         t.displayName = "" // Save a few bytes lol.
264         t.cl.event.Broadcast()
265         t.gotMetainfo.Set()
266         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
267         if err != nil {
268                 return fmt.Errorf("error opening torrent storage: %s", err)
269         }
270         t.length = 0
271         for _, f := range t.info.UpvertedFiles() {
272                 t.length += f.Length
273         }
274         t.metadataBytes = b
275         t.metadataCompletedChunks = nil
276         t.makePieces()
277         for conn := range t.conns {
278                 if err := conn.setNumPieces(t.numPieces()); err != nil {
279                         log.Printf("closing connection: %s", err)
280                         conn.Close()
281                 }
282         }
283         for i := range t.pieces {
284                 t.updatePieceCompletion(i)
285                 t.pieces[i].QueuedForHash = true
286         }
287         go func() {
288                 for i := range t.pieces {
289                         t.verifyPiece(i)
290                 }
291         }()
292         return nil
293 }
294
295 func (t *Torrent) haveAllMetadataPieces() bool {
296         if t.haveInfo() {
297                 return true
298         }
299         if t.metadataCompletedChunks == nil {
300                 return false
301         }
302         for _, have := range t.metadataCompletedChunks {
303                 if !have {
304                         return false
305                 }
306         }
307         return true
308 }
309
310 // TODO: Propagate errors to disconnect peer.
311 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
312         if t.haveInfo() {
313                 // We already know the correct metadata size.
314                 return
315         }
316         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
317                 return errors.New("bad size")
318         }
319         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
320                 return
321         }
322         t.metadataBytes = make([]byte, bytes)
323         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
324         for c := range t.conns {
325                 c.requestPendingMetadata()
326         }
327         return
328 }
329
330 // The current working name for the torrent. Either the name in the info dict,
331 // or a display name given such as by the dn value in a magnet link, or "".
332 func (t *Torrent) name() string {
333         if t.haveInfo() {
334                 return t.info.Name
335         }
336         return t.displayName
337 }
338
339 func (t *Torrent) pieceState(index int) (ret PieceState) {
340         p := &t.pieces[index]
341         ret.Priority = t.piecePriority(index)
342         if t.pieceComplete(index) {
343                 ret.Complete = true
344         }
345         if p.QueuedForHash || p.Hashing {
346                 ret.Checking = true
347         }
348         if !ret.Complete && t.piecePartiallyDownloaded(index) {
349                 ret.Partial = true
350         }
351         return
352 }
353
354 func (t *Torrent) metadataPieceSize(piece int) int {
355         return metadataPieceSize(len(t.metadataBytes), piece)
356 }
357
358 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
359         d := map[string]int{
360                 "msg_type": msgType,
361                 "piece":    piece,
362         }
363         if data != nil {
364                 d["total_size"] = len(t.metadataBytes)
365         }
366         p, err := bencode.Marshal(d)
367         if err != nil {
368                 panic(err)
369         }
370         return pp.Message{
371                 Type:            pp.Extended,
372                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
373                 ExtendedPayload: append(p, data...),
374         }
375 }
376
377 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
378         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
379                 ret = append(ret, PieceStateRun{
380                         PieceState: el.(PieceState),
381                         Length:     int(count),
382                 })
383         })
384         for index := range t.pieces {
385                 rle.Append(t.pieceState(index), 1)
386         }
387         rle.Flush()
388         return
389 }
390
391 // Produces a small string representing a PieceStateRun.
392 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
393         ret = fmt.Sprintf("%d", psr.Length)
394         ret += func() string {
395                 switch psr.Priority {
396                 case PiecePriorityNext:
397                         return "N"
398                 case PiecePriorityNormal:
399                         return "."
400                 case PiecePriorityReadahead:
401                         return "R"
402                 case PiecePriorityNow:
403                         return "!"
404                 default:
405                         return ""
406                 }
407         }()
408         if psr.Checking {
409                 ret += "H"
410         }
411         if psr.Partial {
412                 ret += "P"
413         }
414         if psr.Complete {
415                 ret += "C"
416         }
417         return
418 }
419
420 func (t *Torrent) writeStatus(w io.Writer) {
421         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
422         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
423         if !t.haveInfo() {
424                 fmt.Fprintf(w, "Metadata have: ")
425                 for _, h := range t.metadataCompletedChunks {
426                         fmt.Fprintf(w, "%c", func() rune {
427                                 if h {
428                                         return 'H'
429                                 } else {
430                                         return '.'
431                                 }
432                         }())
433                 }
434                 fmt.Fprintln(w)
435         }
436         fmt.Fprintf(w, "Piece length: %s\n", func() string {
437                 if t.haveInfo() {
438                         return fmt.Sprint(t.usualPieceSize())
439                 } else {
440                         return "?"
441                 }
442         }())
443         if t.Info() != nil {
444                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
445                 fmt.Fprint(w, "Piece States:")
446                 for _, psr := range t.pieceStateRuns() {
447                         w.Write([]byte(" "))
448                         w.Write([]byte(pieceStateRunStatusChars(psr)))
449                 }
450                 fmt.Fprintln(w)
451         }
452         fmt.Fprintf(w, "Reader Pieces:")
453         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
454                 fmt.Fprintf(w, " %d:%d", begin, end)
455                 return true
456         })
457         fmt.Fprintln(w)
458
459         fmt.Fprintf(w, "Trackers:\n")
460         func() {
461                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
462                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
463                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
464                         return l.url < r.url
465                 }).([]*trackerScraper) {
466                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
467                 }
468                 tw.Flush()
469         }()
470
471         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
472
473         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
474         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
475         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
476         conns := t.connsAsSlice()
477         slices.Sort(conns, worseConn)
478         for i, c := range conns {
479                 fmt.Fprintf(w, "%2d. ", i+1)
480                 c.WriteStatus(w, t)
481         }
482 }
483
484 func (t *Torrent) haveInfo() bool {
485         return t.info != nil
486 }
487
488 // TODO: Include URIs that weren't converted to tracker clients.
489 func (t *Torrent) announceList() (al [][]string) {
490         return t.metainfo.AnnounceList
491 }
492
493 // Returns a run-time generated MetaInfo that includes the info bytes and
494 // announce-list as currently known to the client.
495 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
496         return metainfo.MetaInfo{
497                 CreationDate: time.Now().Unix(),
498                 Comment:      "dynamic metainfo from client",
499                 CreatedBy:    "go.torrent",
500                 AnnounceList: t.announceList(),
501                 InfoBytes:    t.metadataBytes,
502         }
503 }
504
505 func (t *Torrent) BytesMissing() int64 {
506         t.mu().RLock()
507         defer t.mu().RUnlock()
508         return t.bytesMissingLocked()
509 }
510
511 func (t *Torrent) bytesMissingLocked() int64 {
512         return t.bytesLeft()
513 }
514
515 func (t *Torrent) bytesLeft() (left int64) {
516         for i := 0; i < t.numPieces(); i++ {
517                 left += int64(t.pieces[i].bytesLeft())
518         }
519         return
520 }
521
522 // Bytes left to give in tracker announces.
523 func (t *Torrent) bytesLeftAnnounce() uint64 {
524         if t.haveInfo() {
525                 return uint64(t.bytesLeft())
526         } else {
527                 return math.MaxUint64
528         }
529 }
530
531 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
532         if t.pieceComplete(piece) {
533                 return false
534         }
535         if t.pieceAllDirty(piece) {
536                 return false
537         }
538         return t.pieces[piece].hasDirtyChunks()
539 }
540
541 func (t *Torrent) usualPieceSize() int {
542         return int(t.info.PieceLength)
543 }
544
545 func (t *Torrent) numPieces() int {
546         return t.info.NumPieces()
547 }
548
549 func (t *Torrent) numPiecesCompleted() (num int) {
550         return t.completedPieces.Len()
551 }
552
553 func (t *Torrent) close() (err error) {
554         t.closed.Set()
555         if t.storage != nil {
556                 t.storage.Close()
557         }
558         for conn := range t.conns {
559                 conn.Close()
560         }
561         t.pieceStateChanges.Close()
562         t.updateWantPeersEvent()
563         return
564 }
565
566 func (t *Torrent) requestOffset(r request) int64 {
567         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
568 }
569
570 // Return the request that would include the given offset into the torrent
571 // data. Returns !ok if there is no such request.
572 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
573         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
574 }
575
576 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
577         tr := perf.NewTimer()
578
579         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
580         if err == nil && n != len(data) {
581                 err = io.ErrShortWrite
582         }
583         if err == nil {
584                 tr.Stop("write chunk")
585         }
586         return
587 }
588
589 func (t *Torrent) bitfield() (bf []bool) {
590         bf = make([]bool, t.numPieces())
591         t.completedPieces.IterTyped(func(piece int) (again bool) {
592                 bf[piece] = true
593                 return true
594         })
595         return
596 }
597
598 func (t *Torrent) pieceNumChunks(piece int) int {
599         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
600 }
601
602 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
603         t.pieces[pieceIndex].DirtyChunks.Clear()
604 }
605
606 type Peer struct {
607         Id     [20]byte
608         IP     net.IP
609         Port   int
610         Source peerSource
611         // Peer is known to support encryption.
612         SupportsEncryption bool
613 }
614
615 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
616         if piece < 0 || piece >= t.info.NumPieces() {
617                 return
618         }
619         if piece == t.numPieces()-1 {
620                 len_ = pp.Integer(t.length % t.info.PieceLength)
621         }
622         if len_ == 0 {
623                 len_ = pp.Integer(t.info.PieceLength)
624         }
625         return
626 }
627
628 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
629         hash := pieceHash.New()
630         p := &t.pieces[piece]
631         p.waitNoPendingWrites()
632         ip := t.info.Piece(piece)
633         pl := ip.Length()
634         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
635         if n == pl {
636                 missinggo.CopyExact(&ret, hash.Sum(nil))
637                 return
638         }
639         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
640                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
641         }
642         return
643 }
644
645 func (t *Torrent) haveAllPieces() bool {
646         if !t.haveInfo() {
647                 return false
648         }
649         return t.completedPieces.Len() == t.numPieces()
650 }
651
652 func (t *Torrent) haveAnyPieces() bool {
653         for i := range t.pieces {
654                 if t.pieceComplete(i) {
655                         return true
656                 }
657         }
658         return false
659 }
660
661 func (t *Torrent) havePiece(index int) bool {
662         return t.haveInfo() && t.pieceComplete(index)
663 }
664
665 func (t *Torrent) haveChunk(r request) (ret bool) {
666         // defer func() {
667         //      log.Println("have chunk", r, ret)
668         // }()
669         if !t.haveInfo() {
670                 return false
671         }
672         if t.pieceComplete(int(r.Index)) {
673                 return true
674         }
675         p := &t.pieces[r.Index]
676         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
677 }
678
679 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
680         return int(cs.Begin / chunkSize)
681 }
682
683 func (t *Torrent) wantPiece(r request) bool {
684         if !t.wantPieceIndex(int(r.Index)) {
685                 return false
686         }
687         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
688                 return true
689         }
690         // TODO: What about pieces that were wanted, but aren't now, and aren't
691         // completed either? That used to be done here.
692         return false
693 }
694
695 func (t *Torrent) wantPieceIndex(index int) bool {
696         if !t.haveInfo() {
697                 return false
698         }
699         if index < 0 || index >= t.numPieces() {
700                 return false
701         }
702         p := &t.pieces[index]
703         if p.QueuedForHash {
704                 return false
705         }
706         if p.Hashing {
707                 return false
708         }
709         if t.pieceComplete(index) {
710                 return false
711         }
712         if t.pendingPieces.Contains(index) {
713                 return true
714         }
715         return !t.forReaderOffsetPieces(func(begin, end int) bool {
716                 return index < begin || index >= end
717         })
718 }
719
720 func (t *Torrent) connHasWantedPieces(c *connection) bool {
721         return !c.pieceRequestOrder.IsEmpty()
722 }
723
724 // The worst connection is one that hasn't been sent, or sent anything useful
725 // for the longest. A bad connection is one that usually sends us unwanted
726 // pieces, or has been in worser half of the established connections for more
727 // than a minute.
728 func (t *Torrent) worstBadConn() *connection {
729         wcs := worseConnSlice{t.worstUnclosedConns()}
730         for wcs.Len() != 0 {
731                 c := heap.Pop(&wcs).(*connection)
732                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
733                         return c
734                 }
735                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
736                         // Give connections 1 minute to prove themselves.
737                         if time.Since(c.completedHandshake) > time.Minute {
738                                 return c
739                         }
740                 }
741         }
742         return nil
743 }
744
745 type PieceStateChange struct {
746         Index int
747         PieceState
748 }
749
750 func (t *Torrent) publishPieceChange(piece int) {
751         cur := t.pieceState(piece)
752         p := &t.pieces[piece]
753         if cur != p.PublicPieceState {
754                 p.PublicPieceState = cur
755                 t.pieceStateChanges.Publish(PieceStateChange{
756                         piece,
757                         cur,
758                 })
759         }
760 }
761
762 func (t *Torrent) pieceNumPendingChunks(piece int) int {
763         if t.pieceComplete(piece) {
764                 return 0
765         }
766         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
767 }
768
769 func (t *Torrent) pieceAllDirty(piece int) bool {
770         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
771 }
772
773 func (t *Torrent) readersChanged() {
774         t.updateReaderPieces()
775         t.updateAllPiecePriorities()
776 }
777
778 func (t *Torrent) updateReaderPieces() {
779         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
780 }
781
782 func (t *Torrent) readerPosChanged(from, to pieceRange) {
783         if from == to {
784                 return
785         }
786         t.updateReaderPieces()
787         // Order the ranges, high and low.
788         l, h := from, to
789         if l.begin > h.begin {
790                 l, h = h, l
791         }
792         if l.end < h.begin {
793                 // Two distinct ranges.
794                 t.updatePiecePriorities(l.begin, l.end)
795                 t.updatePiecePriorities(h.begin, h.end)
796         } else {
797                 // Ranges overlap.
798                 end := l.end
799                 if h.end > end {
800                         end = h.end
801                 }
802                 t.updatePiecePriorities(l.begin, end)
803         }
804 }
805
806 func (t *Torrent) maybeNewConns() {
807         // Tickle the accept routine.
808         t.cl.event.Broadcast()
809         t.openNewConns()
810 }
811
812 func (t *Torrent) piecePriorityChanged(piece int) {
813         for c := range t.conns {
814                 c.updatePiecePriority(piece)
815         }
816         t.maybeNewConns()
817         t.publishPieceChange(piece)
818 }
819
820 func (t *Torrent) updatePiecePriority(piece int) {
821         p := &t.pieces[piece]
822         newPrio := t.piecePriorityUncached(piece)
823         if newPrio == p.priority {
824                 return
825         }
826         p.priority = newPrio
827         t.piecePriorityChanged(piece)
828 }
829
830 func (t *Torrent) updateAllPiecePriorities() {
831         t.updatePiecePriorities(0, len(t.pieces))
832 }
833
834 // Update all piece priorities in one hit. This function should have the same
835 // output as updatePiecePriority, but across all pieces.
836 func (t *Torrent) updatePiecePriorities(begin, end int) {
837         for i := begin; i < end; i++ {
838                 t.updatePiecePriority(i)
839         }
840 }
841
842 // Returns the range of pieces [begin, end) that contains the extent of bytes.
843 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
844         if off >= t.length {
845                 return
846         }
847         if off < 0 {
848                 size += off
849                 off = 0
850         }
851         if size <= 0 {
852                 return
853         }
854         begin = int(off / t.info.PieceLength)
855         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
856         if end > t.info.NumPieces() {
857                 end = t.info.NumPieces()
858         }
859         return
860 }
861
862 // Returns true if all iterations complete without breaking. Returns the read
863 // regions for all readers. The reader regions should not be merged as some
864 // callers depend on this method to enumerate readers.
865 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
866         for r := range t.readers {
867                 p := r.pieces
868                 if p.begin >= p.end {
869                         continue
870                 }
871                 if !f(p.begin, p.end) {
872                         return false
873                 }
874         }
875         return true
876 }
877
878 func (t *Torrent) piecePriority(piece int) piecePriority {
879         if !t.haveInfo() {
880                 return PiecePriorityNone
881         }
882         return t.pieces[piece].priority
883 }
884
885 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
886         if t.pieceComplete(piece) {
887                 return PiecePriorityNone
888         }
889         if t.readerNowPieces.Contains(piece) {
890                 return PiecePriorityNow
891         }
892         // if t.readerNowPieces.Contains(piece - 1) {
893         //      return PiecePriorityNext
894         // }
895         if t.readerReadaheadPieces.Contains(piece) {
896                 return PiecePriorityReadahead
897         }
898         if t.pendingPieces.Contains(piece) {
899                 return PiecePriorityNormal
900         }
901         return PiecePriorityNone
902 }
903
904 func (t *Torrent) pendPiece(piece int) {
905         if t.pendingPieces.Contains(piece) {
906                 return
907         }
908         if t.havePiece(piece) {
909                 return
910         }
911         t.pendingPieces.Add(piece)
912         t.updatePiecePriority(piece)
913 }
914
915 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
916         t.pendingPieces.Sub(unpend)
917         unpend.IterTyped(func(piece int) (again bool) {
918                 t.updatePiecePriority(piece)
919                 return true
920         })
921 }
922
923 func (t *Torrent) pendPieceRange(begin, end int) {
924         for i := begin; i < end; i++ {
925                 t.pendPiece(i)
926         }
927 }
928
929 func (t *Torrent) unpendPieceRange(begin, end int) {
930         var bm bitmap.Bitmap
931         bm.AddRange(begin, end)
932         t.unpendPieces(&bm)
933 }
934
935 func (t *Torrent) pendRequest(req request) {
936         ci := chunkIndex(req.chunkSpec, t.chunkSize)
937         t.pieces[req.Index].pendChunkIndex(ci)
938 }
939
940 func (t *Torrent) pieceCompletionChanged(piece int) {
941         t.cl.event.Broadcast()
942         if t.pieceComplete(piece) {
943                 t.onPieceCompleted(piece)
944         } else {
945                 t.onIncompletePiece(piece)
946         }
947         t.updatePiecePriority(piece)
948 }
949
950 func (t *Torrent) openNewConns() {
951         t.cl.openNewConns(t)
952 }
953
954 func (t *Torrent) getConnPieceInclination() []int {
955         _ret := t.connPieceInclinationPool.Get()
956         if _ret == nil {
957                 pieceInclinationsNew.Add(1)
958                 return rand.Perm(t.numPieces())
959         }
960         pieceInclinationsReused.Add(1)
961         return _ret.([]int)
962 }
963
964 func (t *Torrent) putPieceInclination(pi []int) {
965         t.connPieceInclinationPool.Put(pi)
966         pieceInclinationsPut.Add(1)
967 }
968
969 func (t *Torrent) updatePieceCompletion(piece int) {
970         pcu := t.pieceCompleteUncached(piece)
971         changed := t.completedPieces.Get(piece) != pcu
972         t.completedPieces.Set(piece, pcu)
973         if changed {
974                 t.pieceCompletionChanged(piece)
975         }
976 }
977
978 // Non-blocking read. Client lock is not required.
979 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
980         p := &t.pieces[off/t.info.PieceLength]
981         p.waitNoPendingWrites()
982         return p.Storage().ReadAt(b, off-p.Info().Offset())
983 }
984
985 func (t *Torrent) updateAllPieceCompletions() {
986         for i := range iter.N(t.numPieces()) {
987                 t.updatePieceCompletion(i)
988         }
989 }
990
991 // Returns an error if the metadata was completed, but couldn't be set for
992 // some reason. Blame it on the last peer to contribute.
993 func (t *Torrent) maybeCompleteMetadata() error {
994         if t.haveInfo() {
995                 // Nothing to do.
996                 return nil
997         }
998         if !t.haveAllMetadataPieces() {
999                 // Don't have enough metadata pieces.
1000                 return nil
1001         }
1002         err := t.setInfoBytes(t.metadataBytes)
1003         if err != nil {
1004                 t.invalidateMetadata()
1005                 return fmt.Errorf("error setting info bytes: %s", err)
1006         }
1007         if t.cl.config.Debug {
1008                 log.Printf("%s: got metadata from peers", t)
1009         }
1010         return nil
1011 }
1012
1013 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1014         t.forReaderOffsetPieces(func(begin, end int) bool {
1015                 ret.AddRange(begin, end)
1016                 return true
1017         })
1018         return
1019 }
1020
1021 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1022         t.forReaderOffsetPieces(func(begin, end int) bool {
1023                 if end > begin {
1024                         now.Add(begin)
1025                         readahead.AddRange(begin+1, end)
1026                 }
1027                 return true
1028         })
1029         return
1030 }
1031
1032 func (t *Torrent) needData() bool {
1033         if t.closed.IsSet() {
1034                 return false
1035         }
1036         if !t.haveInfo() {
1037                 return true
1038         }
1039         if t.pendingPieces.Len() != 0 {
1040                 return true
1041         }
1042         // Read as "not all complete".
1043         return !t.readerPieces().IterTyped(func(piece int) bool {
1044                 return t.pieceComplete(piece)
1045         })
1046 }
1047
1048 func appendMissingStrings(old, new []string) (ret []string) {
1049         ret = old
1050 new:
1051         for _, n := range new {
1052                 for _, o := range old {
1053                         if o == n {
1054                                 continue new
1055                         }
1056                 }
1057                 ret = append(ret, n)
1058         }
1059         return
1060 }
1061
1062 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1063         ret = existing
1064         for minNumTiers > len(ret) {
1065                 ret = append(ret, nil)
1066         }
1067         return
1068 }
1069
1070 func (t *Torrent) addTrackers(announceList [][]string) {
1071         fullAnnounceList := &t.metainfo.AnnounceList
1072         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1073         for tierIndex, trackerURLs := range announceList {
1074                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1075         }
1076         t.startMissingTrackerScrapers()
1077         t.updateWantPeersEvent()
1078 }
1079
1080 // Don't call this before the info is available.
1081 func (t *Torrent) bytesCompleted() int64 {
1082         if !t.haveInfo() {
1083                 return 0
1084         }
1085         return t.info.TotalLength() - t.bytesLeft()
1086 }
1087
1088 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1089         t.cl.mu.Lock()
1090         defer t.cl.mu.Unlock()
1091         return t.setInfoBytes(b)
1092 }
1093
1094 // Returns true if connection is removed from torrent.Conns.
1095 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1096         _, ret = t.conns[c]
1097         delete(t.conns, c)
1098         return
1099 }
1100
1101 func (t *Torrent) dropConnection(c *connection) {
1102         t.cl.event.Broadcast()
1103         c.Close()
1104         if t.deleteConnection(c) {
1105                 t.openNewConns()
1106         }
1107 }
1108
1109 func (t *Torrent) wantPeers() bool {
1110         if t.closed.IsSet() {
1111                 return false
1112         }
1113         if len(t.peers) > torrentPeersLowWater {
1114                 return false
1115         }
1116         return t.needData() || t.seeding()
1117 }
1118
1119 func (t *Torrent) updateWantPeersEvent() {
1120         if t.wantPeers() {
1121                 t.wantPeersEvent.Set()
1122         } else {
1123                 t.wantPeersEvent.Clear()
1124         }
1125 }
1126
1127 // Returns whether the client should make effort to seed the torrent.
1128 func (t *Torrent) seeding() bool {
1129         cl := t.cl
1130         if t.closed.IsSet() {
1131                 return false
1132         }
1133         if cl.config.NoUpload {
1134                 return false
1135         }
1136         if !cl.config.Seed {
1137                 return false
1138         }
1139         if t.needData() {
1140                 return false
1141         }
1142         return true
1143 }
1144
1145 // Adds and starts tracker scrapers for tracker URLs that aren't already
1146 // running.
1147 func (t *Torrent) startMissingTrackerScrapers() {
1148         if t.cl.config.DisableTrackers {
1149                 return
1150         }
1151         for _, tier := range t.announceList() {
1152                 for _, trackerURL := range tier {
1153                         if _, ok := t.trackerAnnouncers[trackerURL]; ok {
1154                                 continue
1155                         }
1156                         newAnnouncer := &trackerScraper{
1157                                 url: trackerURL,
1158                                 t:   t,
1159                         }
1160                         if t.trackerAnnouncers == nil {
1161                                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1162                         }
1163                         t.trackerAnnouncers[trackerURL] = newAnnouncer
1164                         go newAnnouncer.Run()
1165                 }
1166         }
1167 }
1168
1169 // Returns an AnnounceRequest with fields filled out to defaults and current
1170 // values.
1171 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1172         return tracker.AnnounceRequest{
1173                 Event:    tracker.None,
1174                 NumWant:  -1,
1175                 Port:     uint16(t.cl.incomingPeerPort()),
1176                 PeerId:   t.cl.peerID,
1177                 InfoHash: t.infoHash,
1178                 Left:     t.bytesLeftAnnounce(),
1179         }
1180 }
1181
1182 // Adds peers revealed in an announce until the announce ends, or we have
1183 // enough peers.
1184 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1185         cl := t.cl
1186         // Count all the unique addresses we got during this announce.
1187         allAddrs := make(map[string]struct{})
1188         for {
1189                 select {
1190                 case v, ok := <-pvs:
1191                         if !ok {
1192                                 return
1193                         }
1194                         addPeers := make([]Peer, 0, len(v.Peers))
1195                         for _, cp := range v.Peers {
1196                                 if cp.Port == 0 {
1197                                         // Can't do anything with this.
1198                                         continue
1199                                 }
1200                                 addPeers = append(addPeers, Peer{
1201                                         IP:     cp.IP[:],
1202                                         Port:   cp.Port,
1203                                         Source: peerSourceDHTGetPeers,
1204                                 })
1205                                 key := (&net.UDPAddr{
1206                                         IP:   cp.IP[:],
1207                                         Port: cp.Port,
1208                                 }).String()
1209                                 allAddrs[key] = struct{}{}
1210                         }
1211                         cl.mu.Lock()
1212                         t.addPeers(addPeers)
1213                         numPeers := len(t.peers)
1214                         cl.mu.Unlock()
1215                         if numPeers >= torrentPeersHighWater {
1216                                 return
1217                         }
1218                 case <-t.closed.LockedChan(&cl.mu):
1219                         return
1220                 }
1221         }
1222 }
1223
1224 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1225         cl := t.cl
1226         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1227         if err != nil {
1228                 return
1229         }
1230         t.consumeDHTAnnounce(ps.Peers)
1231         ps.Close()
1232         return
1233 }
1234
1235 func (t *Torrent) dhtAnnouncer() {
1236         cl := t.cl
1237         for {
1238                 select {
1239                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1240                 case <-t.closed.LockedChan(&cl.mu):
1241                         return
1242                 }
1243                 err := t.announceDHT(true)
1244                 if err == nil {
1245                         cl.mu.Lock()
1246                         t.numDHTAnnounces++
1247                         cl.mu.Unlock()
1248                 } else {
1249                         log.Printf("error announcing %q to DHT: %s", t, err)
1250                 }
1251                 select {
1252                 case <-t.closed.LockedChan(&cl.mu):
1253                         return
1254                 case <-time.After(5 * time.Minute):
1255                 }
1256         }
1257 }
1258
1259 func (t *Torrent) addPeers(peers []Peer) {
1260         for _, p := range peers {
1261                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1262                         continue
1263                 }
1264                 t.addPeer(p)
1265         }
1266 }
1267
1268 func (t *Torrent) Stats() TorrentStats {
1269         t.cl.mu.Lock()
1270         defer t.cl.mu.Unlock()
1271         return t.stats
1272 }
1273
1274 // Returns true if the connection is added.
1275 func (t *Torrent) addConnection(c *connection) bool {
1276         if t.cl.closed.IsSet() {
1277                 return false
1278         }
1279         if !t.wantConns() {
1280                 return false
1281         }
1282         for c0 := range t.conns {
1283                 if c.PeerID == c0.PeerID {
1284                         // Already connected to a client with that ID.
1285                         duplicateClientConns.Add(1)
1286                         return false
1287                 }
1288         }
1289         if len(t.conns) >= t.maxEstablishedConns {
1290                 c := t.worstBadConn()
1291                 if c == nil {
1292                         return false
1293                 }
1294                 if t.cl.config.Debug && missinggo.CryHeard() {
1295                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1296                 }
1297                 c.Close()
1298                 t.deleteConnection(c)
1299         }
1300         if len(t.conns) >= t.maxEstablishedConns {
1301                 panic(len(t.conns))
1302         }
1303         if c.t != nil {
1304                 panic("connection already associated with a torrent")
1305         }
1306         // Reconcile bytes transferred before connection was associated with a
1307         // torrent.
1308         t.stats.wroteBytes(c.stats.BytesWritten)
1309         t.stats.readBytes(c.stats.BytesRead)
1310         c.t = t
1311         t.conns[c] = struct{}{}
1312         return true
1313 }
1314
1315 func (t *Torrent) wantConns() bool {
1316         if t.closed.IsSet() {
1317                 return false
1318         }
1319         if !t.seeding() && !t.needData() {
1320                 return false
1321         }
1322         if len(t.conns) < t.maxEstablishedConns {
1323                 return true
1324         }
1325         return t.worstBadConn() != nil
1326 }
1327
1328 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1329         t.cl.mu.Lock()
1330         defer t.cl.mu.Unlock()
1331         oldMax = t.maxEstablishedConns
1332         t.maxEstablishedConns = max
1333         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1334         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1335                 t.dropConnection(wcs.Pop().(*connection))
1336         }
1337         t.openNewConns()
1338         return oldMax
1339 }
1340
1341 func (t *Torrent) mu() missinggo.RWLocker {
1342         return &t.cl.mu
1343 }
1344
1345 func (t *Torrent) pieceHashed(piece int, correct bool) {
1346         if t.closed.IsSet() {
1347                 return
1348         }
1349         p := &t.pieces[piece]
1350         touchers := t.reapPieceTouchers(piece)
1351         if p.EverHashed {
1352                 // Don't score the first time a piece is hashed, it could be an
1353                 // initial check.
1354                 if correct {
1355                         pieceHashedCorrect.Add(1)
1356                 } else {
1357                         log.Printf("%s: piece %d (%v) failed hash: %d connections contributed", t, piece, p.Hash, len(touchers))
1358                         pieceHashedNotCorrect.Add(1)
1359                 }
1360         }
1361         p.EverHashed = true
1362         if correct {
1363                 for _, c := range touchers {
1364                         c.goodPiecesDirtied++
1365                 }
1366                 err := p.Storage().MarkComplete()
1367                 if err != nil {
1368                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1369                 }
1370                 t.updatePieceCompletion(piece)
1371         } else {
1372                 if len(touchers) != 0 {
1373                         for _, c := range touchers {
1374                                 // Y u do dis peer?!
1375                                 c.badPiecesDirtied++
1376                         }
1377                         slices.Sort(touchers, connLessTrusted)
1378                         log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1379                                 for _, c := range touchers {
1380                                         ret = append(ret, c.netGoodPiecesDirtied())
1381                                 }
1382                                 return
1383                         }())
1384                         c := touchers[0]
1385                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1386                         c.Drop()
1387                 }
1388                 t.onIncompletePiece(piece)
1389         }
1390 }
1391
1392 func (t *Torrent) onPieceCompleted(piece int) {
1393         t.pendingPieces.Remove(piece)
1394         t.pendAllChunkSpecs(piece)
1395         for conn := range t.conns {
1396                 conn.Have(piece)
1397                 for r := range conn.Requests {
1398                         if int(r.Index) == piece {
1399                                 conn.Cancel(r)
1400                         }
1401                 }
1402                 // Could check here if peer doesn't have piece, but due to caching
1403                 // some peers may have said they have a piece but they don't.
1404                 conn.upload()
1405         }
1406 }
1407
1408 func (t *Torrent) onIncompletePiece(piece int) {
1409         if t.pieceAllDirty(piece) {
1410                 t.pendAllChunkSpecs(piece)
1411         }
1412         if !t.wantPieceIndex(piece) {
1413                 return
1414         }
1415         // We could drop any connections that we told we have a piece that we
1416         // don't here. But there's a test failure, and it seems clients don't care
1417         // if you request pieces that you already claim to have. Pruning bad
1418         // connections might just remove any connections that aren't treating us
1419         // favourably anyway.
1420
1421         // for c := range t.conns {
1422         //      if c.sentHave(piece) {
1423         //              c.Drop()
1424         //      }
1425         // }
1426         for conn := range t.conns {
1427                 if conn.PeerHasPiece(piece) {
1428                         conn.updateRequests()
1429                 }
1430         }
1431 }
1432
1433 func (t *Torrent) verifyPiece(piece int) {
1434         cl := t.cl
1435         cl.mu.Lock()
1436         defer cl.mu.Unlock()
1437         p := &t.pieces[piece]
1438         for p.Hashing || t.storage == nil {
1439                 cl.event.Wait()
1440         }
1441         p.QueuedForHash = false
1442         if t.closed.IsSet() || t.pieceComplete(piece) {
1443                 t.updatePiecePriority(piece)
1444                 return
1445         }
1446         p.Hashing = true
1447         t.publishPieceChange(piece)
1448         cl.mu.Unlock()
1449         sum := t.hashPiece(piece)
1450         cl.mu.Lock()
1451         p.Hashing = false
1452         t.pieceHashed(piece, sum == p.Hash)
1453 }
1454
1455 // Return the connections that touched a piece, and clear the entry while
1456 // doing it.
1457 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1458         for c := range t.conns {
1459                 if _, ok := c.peerTouchedPieces[piece]; ok {
1460                         ret = append(ret, c)
1461                         delete(c.peerTouchedPieces, piece)
1462                 }
1463         }
1464         return
1465 }
1466
1467 func (t *Torrent) connsAsSlice() (ret []*connection) {
1468         for c := range t.conns {
1469                 ret = append(ret, c)
1470         }
1471         return
1472 }
1473
1474 // Currently doesn't really queue, but should in the future.
1475 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1476         piece := &t.pieces[pieceIndex]
1477         if piece.QueuedForHash {
1478                 return
1479         }
1480         piece.QueuedForHash = true
1481         t.publishPieceChange(pieceIndex)
1482         go t.verifyPiece(pieceIndex)
1483 }