]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
b9ac2f55e32b98d8f094ed6b6aa3e98695d3e87e
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "sync"
15         "text/tabwriter"
16         "time"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/itertools"
21         "github.com/anacrolix/missinggo/perf"
22         "github.com/anacrolix/missinggo/pubsub"
23         "github.com/anacrolix/missinggo/slices"
24         "github.com/bradfitz/iter"
25
26         "github.com/anacrolix/torrent/bencode"
27         "github.com/anacrolix/torrent/dht"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         closed   missinggo.Event
48         infoHash metainfo.Hash
49         pieces   []piece
50         // Values are the piece indices that changed.
51         pieceStateChanges *pubsub.PubSub
52         // The size of chunks to request from peers over the wire. This is
53         // normally 16KiB by convention these days.
54         chunkSize pp.Integer
55         chunkPool *sync.Pool
56         // Total length of the torrent in bytes. Stored because it's not O(1) to
57         // get this from the info dict.
58         length int64
59
60         // The storage to open when the info dict becomes available.
61         storageOpener *storage.Client
62         // Storage for torrent data.
63         storage *storage.Torrent
64
65         metainfo metainfo.MetaInfo
66
67         // The info dict. nil if we don't have it (yet).
68         info *metainfo.Info
69         // Active peer connections, running message stream loops.
70         conns               []*connection
71         maxEstablishedConns int
72         // Set of addrs to which we're attempting to connect. Connections are
73         // half-open until all handshakes are completed.
74         halfOpen map[string]struct{}
75
76         // Reserve of peers to connect to. A peer can be both here and in the
77         // active connections if were told about the peer after connecting with
78         // them. That encourages us to reconnect to peers that are well known in
79         // the swarm.
80         peers          map[peersKey]Peer
81         wantPeersEvent missinggo.Event
82         // An announcer for each tracker URL.
83         trackerAnnouncers map[string]*trackerScraper
84         // How many times we've initiated a DHT announce. TODO: Move into stats.
85         numDHTAnnounces int
86
87         // Name used if the info name isn't available. Should be cleared when the
88         // Info does become available.
89         displayName string
90         // The bencoded bytes of the info dict. This is actively manipulated if
91         // the info bytes aren't initially available, and we try to fetch them
92         // from peers.
93         metadataBytes []byte
94         // Each element corresponds to the 16KiB metadata pieces. If true, we have
95         // received that piece.
96         metadataCompletedChunks []bool
97
98         // Set when .Info is obtained.
99         gotMetainfo missinggo.Event
100
101         readers               map[*Reader]struct{}
102         readerNowPieces       bitmap.Bitmap
103         readerReadaheadPieces bitmap.Bitmap
104
105         // The indexes of pieces we want with normal priority, that aren't
106         // currently available.
107         pendingPieces bitmap.Bitmap
108         // A cache of completed piece indices.
109         completedPieces bitmap.Bitmap
110
111         // A pool of piece priorities []int for assignment to new connections.
112         // These "inclinations" are used to give connections preference for
113         // different pieces.
114         connPieceInclinationPool sync.Pool
115         // Torrent-level statistics.
116         stats TorrentStats
117 }
118
119 func (t *Torrent) setChunkSize(size pp.Integer) {
120         t.chunkSize = size
121         t.chunkPool = &sync.Pool{
122                 New: func() interface{} {
123                         return make([]byte, size)
124                 },
125         }
126 }
127
128 func (t *Torrent) setDisplayName(dn string) {
129         if t.haveInfo() {
130                 return
131         }
132         t.displayName = dn
133 }
134
135 func (t *Torrent) pieceComplete(piece int) bool {
136         return t.completedPieces.Get(piece)
137 }
138
139 func (t *Torrent) pieceCompleteUncached(piece int) bool {
140         return t.pieces[piece].Storage().GetIsComplete()
141 }
142
143 func (t *Torrent) numConnsUnchoked() (num int) {
144         for _, c := range t.conns {
145                 if !c.PeerChoked {
146                         num++
147                 }
148         }
149         return
150 }
151
152 // There's a connection to that address already.
153 func (t *Torrent) addrActive(addr string) bool {
154         if _, ok := t.halfOpen[addr]; ok {
155                 return true
156         }
157         for _, c := range t.conns {
158                 if c.remoteAddr().String() == addr {
159                         return true
160                 }
161         }
162         return false
163 }
164
165 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
166         ret = make([]*connection, 0, len(t.conns))
167         for _, c := range t.conns {
168                 if !c.closed.IsSet() {
169                         ret = append(ret, c)
170                 }
171         }
172         return
173 }
174
175 func (t *Torrent) addPeer(p Peer) {
176         cl := t.cl
177         cl.openNewConns(t)
178         if len(t.peers) >= torrentPeersHighWater {
179                 return
180         }
181         key := peersKey{string(p.IP), p.Port}
182         if _, ok := t.peers[key]; ok {
183                 return
184         }
185         t.peers[key] = p
186         peersAddedBySource.Add(string(p.Source), 1)
187         cl.openNewConns(t)
188
189 }
190
191 func (t *Torrent) invalidateMetadata() {
192         for i := range t.metadataCompletedChunks {
193                 t.metadataCompletedChunks[i] = false
194         }
195         t.info = nil
196 }
197
198 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
199         if t.haveInfo() {
200                 return
201         }
202         if index >= len(t.metadataCompletedChunks) {
203                 log.Printf("%s: ignoring metadata piece %d", t, index)
204                 return
205         }
206         copy(t.metadataBytes[(1<<14)*index:], data)
207         t.metadataCompletedChunks[index] = true
208 }
209
210 func (t *Torrent) metadataPieceCount() int {
211         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
212 }
213
214 func (t *Torrent) haveMetadataPiece(piece int) bool {
215         if t.haveInfo() {
216                 return (1<<14)*piece < len(t.metadataBytes)
217         } else {
218                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
219         }
220 }
221
222 func (t *Torrent) metadataSizeKnown() bool {
223         return t.metadataBytes != nil
224 }
225
226 func (t *Torrent) metadataSize() int {
227         return len(t.metadataBytes)
228 }
229
230 func infoPieceHashes(info *metainfo.Info) (ret []string) {
231         for i := 0; i < len(info.Pieces); i += sha1.Size {
232                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
233         }
234         return
235 }
236
237 func (t *Torrent) makePieces() {
238         hashes := infoPieceHashes(t.info)
239         t.pieces = make([]piece, len(hashes))
240         for i, hash := range hashes {
241                 piece := &t.pieces[i]
242                 piece.t = t
243                 piece.index = i
244                 piece.noPendingWrites.L = &piece.pendingWritesMutex
245                 missinggo.CopyExact(piece.Hash[:], hash)
246         }
247 }
248
249 // Called when metadata for a torrent becomes available.
250 func (t *Torrent) setInfoBytes(b []byte) error {
251         if t.haveInfo() {
252                 return nil
253         }
254         if metainfo.HashBytes(b) != t.infoHash {
255                 return errors.New("info bytes have wrong hash")
256         }
257         var info metainfo.Info
258         err := bencode.Unmarshal(b, &info)
259         if err != nil {
260                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
261         }
262         err = validateInfo(&info)
263         if err != nil {
264                 return fmt.Errorf("bad info: %s", err)
265         }
266         defer t.updateWantPeersEvent()
267         t.info = &info
268         t.displayName = "" // Save a few bytes lol.
269         t.cl.event.Broadcast()
270         t.gotMetainfo.Set()
271         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
272         if err != nil {
273                 return fmt.Errorf("error opening torrent storage: %s", err)
274         }
275         t.length = 0
276         for _, f := range t.info.UpvertedFiles() {
277                 t.length += f.Length
278         }
279         t.metadataBytes = b
280         t.metadataCompletedChunks = nil
281         t.makePieces()
282         for _, conn := range t.conns {
283                 if err := conn.setNumPieces(t.numPieces()); err != nil {
284                         log.Printf("closing connection: %s", err)
285                         conn.Close()
286                 }
287         }
288         for i := range t.pieces {
289                 t.updatePieceCompletion(i)
290                 t.pieces[i].QueuedForHash = true
291         }
292         go func() {
293                 for i := range t.pieces {
294                         t.verifyPiece(i)
295                 }
296         }()
297         return nil
298 }
299
300 func (t *Torrent) verifyPiece(piece int) {
301         t.cl.verifyPiece(t, piece)
302 }
303
304 func (t *Torrent) haveAllMetadataPieces() bool {
305         if t.haveInfo() {
306                 return true
307         }
308         if t.metadataCompletedChunks == nil {
309                 return false
310         }
311         for _, have := range t.metadataCompletedChunks {
312                 if !have {
313                         return false
314                 }
315         }
316         return true
317 }
318
319 // TODO: Propagate errors to disconnect peer.
320 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
321         if t.haveInfo() {
322                 // We already know the correct metadata size.
323                 return
324         }
325         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
326                 return errors.New("bad size")
327         }
328         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
329                 return
330         }
331         t.metadataBytes = make([]byte, bytes)
332         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
333         for _, c := range t.conns {
334                 c.requestPendingMetadata()
335         }
336         return
337 }
338
339 // The current working name for the torrent. Either the name in the info dict,
340 // or a display name given such as by the dn value in a magnet link, or "".
341 func (t *Torrent) name() string {
342         if t.haveInfo() {
343                 return t.info.Name
344         }
345         return t.displayName
346 }
347
348 func (t *Torrent) pieceState(index int) (ret PieceState) {
349         p := &t.pieces[index]
350         ret.Priority = t.piecePriority(index)
351         if t.pieceComplete(index) {
352                 ret.Complete = true
353         }
354         if p.QueuedForHash || p.Hashing {
355                 ret.Checking = true
356         }
357         if !ret.Complete && t.piecePartiallyDownloaded(index) {
358                 ret.Partial = true
359         }
360         return
361 }
362
363 func (t *Torrent) metadataPieceSize(piece int) int {
364         return metadataPieceSize(len(t.metadataBytes), piece)
365 }
366
367 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
368         d := map[string]int{
369                 "msg_type": msgType,
370                 "piece":    piece,
371         }
372         if data != nil {
373                 d["total_size"] = len(t.metadataBytes)
374         }
375         p, err := bencode.Marshal(d)
376         if err != nil {
377                 panic(err)
378         }
379         return pp.Message{
380                 Type:            pp.Extended,
381                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
382                 ExtendedPayload: append(p, data...),
383         }
384 }
385
386 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
387         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
388                 ret = append(ret, PieceStateRun{
389                         PieceState: el.(PieceState),
390                         Length:     int(count),
391                 })
392         })
393         for index := range t.pieces {
394                 rle.Append(t.pieceState(index), 1)
395         }
396         rle.Flush()
397         return
398 }
399
400 // Produces a small string representing a PieceStateRun.
401 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
402         ret = fmt.Sprintf("%d", psr.Length)
403         ret += func() string {
404                 switch psr.Priority {
405                 case PiecePriorityNext:
406                         return "N"
407                 case PiecePriorityNormal:
408                         return "."
409                 case PiecePriorityReadahead:
410                         return "R"
411                 case PiecePriorityNow:
412                         return "!"
413                 default:
414                         return ""
415                 }
416         }()
417         if psr.Checking {
418                 ret += "H"
419         }
420         if psr.Partial {
421                 ret += "P"
422         }
423         if psr.Complete {
424                 ret += "C"
425         }
426         return
427 }
428
429 func (t *Torrent) writeStatus(w io.Writer) {
430         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
431         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
432         if !t.haveInfo() {
433                 fmt.Fprintf(w, "Metadata have: ")
434                 for _, h := range t.metadataCompletedChunks {
435                         fmt.Fprintf(w, "%c", func() rune {
436                                 if h {
437                                         return 'H'
438                                 } else {
439                                         return '.'
440                                 }
441                         }())
442                 }
443                 fmt.Fprintln(w)
444         }
445         fmt.Fprintf(w, "Piece length: %s\n", func() string {
446                 if t.haveInfo() {
447                         return fmt.Sprint(t.usualPieceSize())
448                 } else {
449                         return "?"
450                 }
451         }())
452         if t.Info() != nil {
453                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
454                 fmt.Fprint(w, "Piece States:")
455                 for _, psr := range t.PieceStateRuns() {
456                         w.Write([]byte(" "))
457                         w.Write([]byte(pieceStateRunStatusChars(psr)))
458                 }
459                 fmt.Fprintln(w)
460         }
461         fmt.Fprintf(w, "Reader Pieces:")
462         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
463                 fmt.Fprintf(w, " %d:%d", begin, end)
464                 return true
465         })
466         fmt.Fprintln(w)
467
468         fmt.Fprintf(w, "Trackers:\n")
469         func() {
470                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
471                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
472                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
473                         return l.url < r.url
474                 }).([]*trackerScraper) {
475                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
476                 }
477                 tw.Flush()
478         }()
479
480         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
481
482         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
483         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
484         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
485         slices.Sort(t.conns, worseConn)
486         for i, c := range t.conns {
487                 fmt.Fprintf(w, "%2d. ", i+1)
488                 c.WriteStatus(w, t)
489         }
490 }
491
492 func (t *Torrent) haveInfo() bool {
493         return t.info != nil
494 }
495
496 // TODO: Include URIs that weren't converted to tracker clients.
497 func (t *Torrent) announceList() (al [][]string) {
498         return t.metainfo.AnnounceList
499 }
500
501 // Returns a run-time generated MetaInfo that includes the info bytes and
502 // announce-list as currently known to the client.
503 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
504         return metainfo.MetaInfo{
505                 CreationDate: time.Now().Unix(),
506                 Comment:      "dynamic metainfo from client",
507                 CreatedBy:    "go.torrent",
508                 AnnounceList: t.announceList(),
509                 InfoBytes:    t.metadataBytes,
510         }
511 }
512
513 func (t *Torrent) BytesMissing() int64 {
514         t.mu().RLock()
515         defer t.mu().RUnlock()
516         return t.bytesLeft()
517 }
518
519 func (t *Torrent) bytesLeft() (left int64) {
520         for i := 0; i < t.numPieces(); i++ {
521                 left += int64(t.pieces[i].bytesLeft())
522         }
523         return
524 }
525
526 // Bytes left to give in tracker announces.
527 func (t *Torrent) bytesLeftAnnounce() uint64 {
528         if t.haveInfo() {
529                 return uint64(t.bytesLeft())
530         } else {
531                 return math.MaxUint64
532         }
533 }
534
535 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
536         if t.pieceComplete(piece) {
537                 return false
538         }
539         if t.pieceAllDirty(piece) {
540                 return false
541         }
542         return t.pieces[piece].hasDirtyChunks()
543 }
544
545 func (t *Torrent) usualPieceSize() int {
546         return int(t.info.PieceLength)
547 }
548
549 func (t *Torrent) lastPieceSize() int {
550         return int(t.pieceLength(t.numPieces() - 1))
551 }
552
553 func (t *Torrent) numPieces() int {
554         return t.info.NumPieces()
555 }
556
557 func (t *Torrent) numPiecesCompleted() (num int) {
558         return t.completedPieces.Len()
559 }
560
561 func (t *Torrent) close() (err error) {
562         t.closed.Set()
563         if t.storage != nil {
564                 t.storage.Close()
565         }
566         for _, conn := range t.conns {
567                 conn.Close()
568         }
569         t.pieceStateChanges.Close()
570         t.updateWantPeersEvent()
571         return
572 }
573
574 func (t *Torrent) requestOffset(r request) int64 {
575         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
576 }
577
578 // Return the request that would include the given offset into the torrent
579 // data. Returns !ok if there is no such request.
580 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
581         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
582 }
583
584 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
585         tr := perf.NewTimer()
586
587         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
588         if err == nil && n != len(data) {
589                 err = io.ErrShortWrite
590         }
591         if err == nil {
592                 tr.Stop("write chunk")
593         }
594         return
595 }
596
597 func (t *Torrent) bitfield() (bf []bool) {
598         bf = make([]bool, t.numPieces())
599         t.completedPieces.IterTyped(func(piece int) (again bool) {
600                 bf[piece] = true
601                 return true
602         })
603         return
604 }
605
606 func (t *Torrent) validOutgoingRequest(r request) bool {
607         if r.Index >= pp.Integer(t.info.NumPieces()) {
608                 return false
609         }
610         if r.Begin%t.chunkSize != 0 {
611                 return false
612         }
613         if r.Length > t.chunkSize {
614                 return false
615         }
616         pieceLength := t.pieceLength(int(r.Index))
617         if r.Begin+r.Length > pieceLength {
618                 return false
619         }
620         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
621 }
622
623 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
624         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
625         var cs chunkSpec
626         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
627                 cs.Length = left
628                 if cs.Length > t.chunkSize {
629                         cs.Length = t.chunkSize
630                 }
631                 css = append(css, cs)
632                 cs.Begin += cs.Length
633         }
634         return
635 }
636
637 func (t *Torrent) pieceNumChunks(piece int) int {
638         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
639 }
640
641 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
642         t.pieces[pieceIndex].DirtyChunks.Clear()
643 }
644
645 type Peer struct {
646         Id     [20]byte
647         IP     net.IP
648         Port   int
649         Source peerSource
650         // Peer is known to support encryption.
651         SupportsEncryption bool
652 }
653
654 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
655         if piece < 0 || piece >= t.info.NumPieces() {
656                 return
657         }
658         if piece == t.numPieces()-1 {
659                 len_ = pp.Integer(t.length % t.info.PieceLength)
660         }
661         if len_ == 0 {
662                 len_ = pp.Integer(t.info.PieceLength)
663         }
664         return
665 }
666
667 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
668         hash := pieceHash.New()
669         p := &t.pieces[piece]
670         p.waitNoPendingWrites()
671         ip := t.info.Piece(piece)
672         pl := ip.Length()
673         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
674         if n == pl {
675                 missinggo.CopyExact(&ret, hash.Sum(nil))
676                 return
677         }
678         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
679                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
680         }
681         return
682 }
683
684 func (t *Torrent) haveAllPieces() bool {
685         if !t.haveInfo() {
686                 return false
687         }
688         return t.completedPieces.Len() == t.numPieces()
689 }
690
691 func (t *Torrent) haveAnyPieces() bool {
692         for i := range t.pieces {
693                 if t.pieceComplete(i) {
694                         return true
695                 }
696         }
697         return false
698 }
699
700 func (t *Torrent) havePiece(index int) bool {
701         return t.haveInfo() && t.pieceComplete(index)
702 }
703
704 func (t *Torrent) haveChunk(r request) (ret bool) {
705         // defer func() {
706         //      log.Println("have chunk", r, ret)
707         // }()
708         if !t.haveInfo() {
709                 return false
710         }
711         if t.pieceComplete(int(r.Index)) {
712                 return true
713         }
714         p := &t.pieces[r.Index]
715         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
716 }
717
718 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
719         return int(cs.Begin / chunkSize)
720 }
721
722 func (t *Torrent) wantPiece(r request) bool {
723         if !t.wantPieceIndex(int(r.Index)) {
724                 return false
725         }
726         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
727                 return true
728         }
729         // TODO: What about pieces that were wanted, but aren't now, and aren't
730         // completed either? That used to be done here.
731         return false
732 }
733
734 func (t *Torrent) wantPieceIndex(index int) bool {
735         if !t.haveInfo() {
736                 return false
737         }
738         p := &t.pieces[index]
739         if p.QueuedForHash {
740                 return false
741         }
742         if p.Hashing {
743                 return false
744         }
745         if t.pieceComplete(index) {
746                 return false
747         }
748         if t.pendingPieces.Contains(index) {
749                 return true
750         }
751         return !t.forReaderOffsetPieces(func(begin, end int) bool {
752                 return index < begin || index >= end
753         })
754 }
755
756 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
757         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
758                 for i := begin; begin < end; i++ {
759                         if !f(i) {
760                                 return false
761                         }
762                 }
763                 return true
764         })
765 }
766
767 func (t *Torrent) connHasWantedPieces(c *connection) bool {
768         return !c.pieceRequestOrder.IsEmpty()
769 }
770
771 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
772         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
773                 pieces = append(pieces, int(i))
774         }
775         return
776 }
777
778 // The worst connection is one that hasn't been sent, or sent anything useful
779 // for the longest. A bad connection is one that usually sends us unwanted
780 // pieces, or has been in worser half of the established connections for more
781 // than a minute.
782 func (t *Torrent) worstBadConn() *connection {
783         wcs := slices.HeapInterface(t.worstUnclosedConns(), worseConn)
784         for wcs.Len() != 0 {
785                 c := heap.Pop(wcs).(*connection)
786                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
787                         return c
788                 }
789                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
790                         // Give connections 1 minute to prove themselves.
791                         if time.Since(c.completedHandshake) > time.Minute {
792                                 return c
793                         }
794                 }
795         }
796         return nil
797 }
798
799 type PieceStateChange struct {
800         Index int
801         PieceState
802 }
803
804 func (t *Torrent) publishPieceChange(piece int) {
805         cur := t.pieceState(piece)
806         p := &t.pieces[piece]
807         if cur != p.PublicPieceState {
808                 p.PublicPieceState = cur
809                 t.pieceStateChanges.Publish(PieceStateChange{
810                         piece,
811                         cur,
812                 })
813         }
814 }
815
816 func (t *Torrent) pieceNumPendingChunks(piece int) int {
817         if t.pieceComplete(piece) {
818                 return 0
819         }
820         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
821 }
822
823 func (t *Torrent) pieceAllDirty(piece int) bool {
824         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
825 }
826
827 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
828         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
829                 if begin < end {
830                         if !f(begin) {
831                                 return false
832                         }
833                 }
834                 return true
835         })
836 }
837
838 func (t *Torrent) readersChanged() {
839         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
840         t.updatePiecePriorities()
841 }
842
843 func (t *Torrent) maybeNewConns() {
844         // Tickle the accept routine.
845         t.cl.event.Broadcast()
846         t.openNewConns()
847 }
848
849 func (t *Torrent) piecePriorityChanged(piece int) {
850         for _, c := range t.conns {
851                 c.updatePiecePriority(piece)
852         }
853         t.maybeNewConns()
854         t.publishPieceChange(piece)
855 }
856
857 func (t *Torrent) updatePiecePriority(piece int) {
858         p := &t.pieces[piece]
859         newPrio := t.piecePriorityUncached(piece)
860         if newPrio == p.priority {
861                 return
862         }
863         p.priority = newPrio
864         t.piecePriorityChanged(piece)
865 }
866
867 // Update all piece priorities in one hit. This function should have the same
868 // output as updatePiecePriority, but across all pieces.
869 func (t *Torrent) updatePiecePriorities() {
870         for i := range t.pieces {
871                 t.updatePiecePriority(i)
872         }
873 }
874
875 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
876         if off >= t.length {
877                 return
878         }
879         if off < 0 {
880                 size += off
881                 off = 0
882         }
883         if size <= 0 {
884                 return
885         }
886         begin = int(off / t.info.PieceLength)
887         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
888         if end > t.info.NumPieces() {
889                 end = t.info.NumPieces()
890         }
891         return
892 }
893
894 // Returns true if all iterations complete without breaking. Returns the read
895 // regions for all readers. The reader regions should not be merged as some
896 // callers depend on this method to enumerate readers.
897 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
898         for r := range t.readers {
899                 // r.mu.Lock()
900                 pos, readahead := r.pos, r.readahead
901                 // r.mu.Unlock()
902                 if readahead < 1 {
903                         readahead = 1
904                 }
905                 begin, end := t.byteRegionPieces(pos, readahead)
906                 if begin >= end {
907                         continue
908                 }
909                 if !f(begin, end) {
910                         return false
911                 }
912         }
913         return true
914 }
915
916 func (t *Torrent) piecePriority(piece int) piecePriority {
917         if !t.haveInfo() {
918                 return PiecePriorityNone
919         }
920         return t.pieces[piece].priority
921 }
922
923 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
924         if t.pieceComplete(piece) {
925                 return PiecePriorityNone
926         }
927         if t.readerNowPieces.Contains(piece) {
928                 return PiecePriorityNow
929         }
930         // if t.readerNowPieces.Contains(piece - 1) {
931         //      return PiecePriorityNext
932         // }
933         if t.readerReadaheadPieces.Contains(piece) {
934                 return PiecePriorityReadahead
935         }
936         if t.pendingPieces.Contains(piece) {
937                 return PiecePriorityNormal
938         }
939         return PiecePriorityNone
940 }
941
942 func (t *Torrent) pendPiece(piece int) {
943         if t.pendingPieces.Contains(piece) {
944                 return
945         }
946         if t.havePiece(piece) {
947                 return
948         }
949         t.pendingPieces.Add(piece)
950         t.updatePiecePriority(piece)
951 }
952
953 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
954         return t.completedPieces.Copy()
955 }
956
957 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
958         t.pendingPieces.Sub(unpend)
959         t.updatePiecePriorities()
960 }
961
962 func (t *Torrent) pendPieceRange(begin, end int) {
963         for i := begin; i < end; i++ {
964                 t.pendPiece(i)
965         }
966 }
967
968 func (t *Torrent) unpendPieceRange(begin, end int) {
969         var bm bitmap.Bitmap
970         bm.AddRange(begin, end)
971         t.unpendPieces(&bm)
972 }
973
974 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
975         if !c.PeerHasPiece(piece) {
976                 return true
977         }
978         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
979         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
980                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
981                 return c.Request(req)
982         })
983 }
984
985 func (t *Torrent) pendRequest(req request) {
986         ci := chunkIndex(req.chunkSpec, t.chunkSize)
987         t.pieces[req.Index].pendChunkIndex(ci)
988 }
989
990 func (t *Torrent) pieceChanged(piece int) {
991         t.cl.pieceChanged(t, piece)
992 }
993
994 func (t *Torrent) openNewConns() {
995         t.cl.openNewConns(t)
996 }
997
998 func (t *Torrent) getConnPieceInclination() []int {
999         _ret := t.connPieceInclinationPool.Get()
1000         if _ret == nil {
1001                 pieceInclinationsNew.Add(1)
1002                 return rand.Perm(t.numPieces())
1003         }
1004         pieceInclinationsReused.Add(1)
1005         return _ret.([]int)
1006 }
1007
1008 func (t *Torrent) putPieceInclination(pi []int) {
1009         t.connPieceInclinationPool.Put(pi)
1010         pieceInclinationsPut.Add(1)
1011 }
1012
1013 func (t *Torrent) updatePieceCompletion(piece int) {
1014         pcu := t.pieceCompleteUncached(piece)
1015         changed := t.completedPieces.Get(piece) != pcu
1016         t.completedPieces.Set(piece, pcu)
1017         if changed {
1018                 t.pieceChanged(piece)
1019         }
1020 }
1021
1022 // Non-blocking read. Client lock is not required.
1023 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1024         p := &t.pieces[off/t.info.PieceLength]
1025         p.waitNoPendingWrites()
1026         return p.Storage().ReadAt(b, off-p.Info().Offset())
1027 }
1028
1029 func (t *Torrent) updateAllPieceCompletions() {
1030         for i := range iter.N(t.numPieces()) {
1031                 t.updatePieceCompletion(i)
1032         }
1033 }
1034
1035 // Returns an error if the metadata was completed, but couldn't be set for
1036 // some reason. Blame it on the last peer to contribute.
1037 func (t *Torrent) maybeCompleteMetadata() error {
1038         if t.haveInfo() {
1039                 // Nothing to do.
1040                 return nil
1041         }
1042         if !t.haveAllMetadataPieces() {
1043                 // Don't have enough metadata pieces.
1044                 return nil
1045         }
1046         err := t.setInfoBytes(t.metadataBytes)
1047         if err != nil {
1048                 t.invalidateMetadata()
1049                 return fmt.Errorf("error setting info bytes: %s", err)
1050         }
1051         if t.cl.config.Debug {
1052                 log.Printf("%s: got metadata from peers", t)
1053         }
1054         return nil
1055 }
1056
1057 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1058         t.forReaderOffsetPieces(func(begin, end int) bool {
1059                 ret.AddRange(begin, end)
1060                 return true
1061         })
1062         return
1063 }
1064
1065 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1066         t.forReaderOffsetPieces(func(begin, end int) bool {
1067                 if end > begin {
1068                         now.Add(begin)
1069                         readahead.AddRange(begin+1, end)
1070                 }
1071                 return true
1072         })
1073         return
1074 }
1075
1076 func (t *Torrent) needData() bool {
1077         if t.closed.IsSet() {
1078                 return false
1079         }
1080         if !t.haveInfo() {
1081                 return true
1082         }
1083         if t.pendingPieces.Len() != 0 {
1084                 return true
1085         }
1086         // Read as "not all complete".
1087         return !t.readerPieces().IterTyped(func(piece int) bool {
1088                 return t.pieceComplete(piece)
1089         })
1090 }
1091
1092 func appendMissingStrings(old, new []string) (ret []string) {
1093         ret = old
1094 new:
1095         for _, n := range new {
1096                 for _, o := range old {
1097                         if o == n {
1098                                 continue new
1099                         }
1100                 }
1101                 ret = append(ret, n)
1102         }
1103         return
1104 }
1105
1106 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1107         ret = existing
1108         for minNumTiers > len(ret) {
1109                 ret = append(ret, nil)
1110         }
1111         return
1112 }
1113
1114 func (t *Torrent) addTrackers(announceList [][]string) {
1115         fullAnnounceList := &t.metainfo.AnnounceList
1116         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1117         for tierIndex, trackerURLs := range announceList {
1118                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1119         }
1120         t.startMissingTrackerScrapers()
1121         t.updateWantPeersEvent()
1122 }
1123
1124 // Don't call this before the info is available.
1125 func (t *Torrent) bytesCompleted() int64 {
1126         if !t.haveInfo() {
1127                 return 0
1128         }
1129         return t.info.TotalLength() - t.bytesLeft()
1130 }
1131
1132 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1133         t.cl.mu.Lock()
1134         defer t.cl.mu.Unlock()
1135         return t.setInfoBytes(b)
1136 }
1137
1138 // Returns true if connection is removed from torrent.Conns.
1139 func (t *Torrent) deleteConnection(c *connection) bool {
1140         for i0, _c := range t.conns {
1141                 if _c != c {
1142                         continue
1143                 }
1144                 i1 := len(t.conns) - 1
1145                 if i0 != i1 {
1146                         t.conns[i0] = t.conns[i1]
1147                 }
1148                 t.conns = t.conns[:i1]
1149                 return true
1150         }
1151         return false
1152 }
1153
1154 func (t *Torrent) dropConnection(c *connection) {
1155         t.cl.event.Broadcast()
1156         c.Close()
1157         if t.deleteConnection(c) {
1158                 t.openNewConns()
1159         }
1160 }
1161
1162 func (t *Torrent) wantPeers() bool {
1163         if t.closed.IsSet() {
1164                 return false
1165         }
1166         if len(t.peers) > torrentPeersLowWater {
1167                 return false
1168         }
1169         return t.needData() || t.seeding()
1170 }
1171
1172 func (t *Torrent) updateWantPeersEvent() {
1173         if t.wantPeers() {
1174                 t.wantPeersEvent.Set()
1175         } else {
1176                 t.wantPeersEvent.Clear()
1177         }
1178 }
1179
1180 // Returns whether the client should make effort to seed the torrent.
1181 func (t *Torrent) seeding() bool {
1182         cl := t.cl
1183         if t.closed.IsSet() {
1184                 return false
1185         }
1186         if cl.config.NoUpload {
1187                 return false
1188         }
1189         if !cl.config.Seed {
1190                 return false
1191         }
1192         if t.needData() {
1193                 return false
1194         }
1195         return true
1196 }
1197
1198 // Adds and starts tracker scrapers for tracker URLs that aren't already
1199 // running.
1200 func (t *Torrent) startMissingTrackerScrapers() {
1201         if t.cl.config.DisableTrackers {
1202                 return
1203         }
1204         for _, tier := range t.announceList() {
1205                 for _, trackerURL := range tier {
1206                         if _, ok := t.trackerAnnouncers[trackerURL]; ok {
1207                                 continue
1208                         }
1209                         newAnnouncer := &trackerScraper{
1210                                 url: trackerURL,
1211                                 t:   t,
1212                         }
1213                         if t.trackerAnnouncers == nil {
1214                                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1215                         }
1216                         t.trackerAnnouncers[trackerURL] = newAnnouncer
1217                         go newAnnouncer.Run()
1218                 }
1219         }
1220 }
1221
1222 // Returns an AnnounceRequest with fields filled out to defaults and current
1223 // values.
1224 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1225         return tracker.AnnounceRequest{
1226                 Event:    tracker.None,
1227                 NumWant:  -1,
1228                 Port:     uint16(t.cl.incomingPeerPort()),
1229                 PeerId:   t.cl.peerID,
1230                 InfoHash: t.infoHash,
1231                 Left:     t.bytesLeftAnnounce(),
1232         }
1233 }
1234
1235 // Adds peers revealed in an announce until the announce ends, or we have
1236 // enough peers.
1237 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1238         cl := t.cl
1239         // Count all the unique addresses we got during this announce.
1240         allAddrs := make(map[string]struct{})
1241         for {
1242                 select {
1243                 case v, ok := <-pvs:
1244                         if !ok {
1245                                 return
1246                         }
1247                         addPeers := make([]Peer, 0, len(v.Peers))
1248                         for _, cp := range v.Peers {
1249                                 if cp.Port == 0 {
1250                                         // Can't do anything with this.
1251                                         continue
1252                                 }
1253                                 addPeers = append(addPeers, Peer{
1254                                         IP:     cp.IP[:],
1255                                         Port:   cp.Port,
1256                                         Source: peerSourceDHT,
1257                                 })
1258                                 key := (&net.UDPAddr{
1259                                         IP:   cp.IP[:],
1260                                         Port: cp.Port,
1261                                 }).String()
1262                                 allAddrs[key] = struct{}{}
1263                         }
1264                         cl.mu.Lock()
1265                         t.addPeers(addPeers)
1266                         numPeers := len(t.peers)
1267                         cl.mu.Unlock()
1268                         if numPeers >= torrentPeersHighWater {
1269                                 return
1270                         }
1271                 case <-t.closed.LockedChan(&cl.mu):
1272                         return
1273                 }
1274         }
1275 }
1276
1277 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1278         cl := t.cl
1279         ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1280         if err != nil {
1281                 return
1282         }
1283         t.consumeDHTAnnounce(ps.Peers)
1284         ps.Close()
1285         return
1286 }
1287
1288 func (t *Torrent) dhtAnnouncer() {
1289         cl := t.cl
1290         for {
1291                 select {
1292                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1293                 case <-t.closed.LockedChan(&cl.mu):
1294                         return
1295                 }
1296                 err := t.announceDHT(true)
1297                 if err == nil {
1298                         cl.mu.Lock()
1299                         t.numDHTAnnounces++
1300                         cl.mu.Unlock()
1301                 } else {
1302                         log.Printf("error announcing %q to DHT: %s", t, err)
1303                 }
1304                 select {
1305                 case <-t.closed.LockedChan(&cl.mu):
1306                         return
1307                 case <-time.After(5 * time.Minute):
1308                 }
1309         }
1310 }
1311
1312 func (t *Torrent) addPeers(peers []Peer) {
1313         for _, p := range peers {
1314                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1315                         continue
1316                 }
1317                 t.addPeer(p)
1318         }
1319 }
1320
1321 func (t *Torrent) Stats() TorrentStats {
1322         t.cl.mu.Lock()
1323         defer t.cl.mu.Unlock()
1324         return t.stats
1325 }
1326
1327 // Returns true if the connection is added.
1328 func (t *Torrent) addConnection(c *connection) bool {
1329         if t.cl.closed.IsSet() {
1330                 return false
1331         }
1332         if !t.wantConns() {
1333                 return false
1334         }
1335         for _, c0 := range t.conns {
1336                 if c.PeerID == c0.PeerID {
1337                         // Already connected to a client with that ID.
1338                         duplicateClientConns.Add(1)
1339                         return false
1340                 }
1341         }
1342         if len(t.conns) >= t.maxEstablishedConns {
1343                 c := t.worstBadConn()
1344                 if c == nil {
1345                         return false
1346                 }
1347                 if t.cl.config.Debug && missinggo.CryHeard() {
1348                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1349                 }
1350                 c.Close()
1351                 t.deleteConnection(c)
1352         }
1353         if len(t.conns) >= t.maxEstablishedConns {
1354                 panic(len(t.conns))
1355         }
1356         t.conns = append(t.conns, c)
1357         if c.t != nil {
1358                 panic("connection already associated with a torrent")
1359         }
1360         // Reconcile bytes transferred before connection was associated with a
1361         // torrent.
1362         t.stats.wroteBytes(c.stats.BytesWritten)
1363         t.stats.readBytes(c.stats.BytesRead)
1364         c.t = t
1365         return true
1366 }
1367
1368 func (t *Torrent) wantConns() bool {
1369         if t.closed.IsSet() {
1370                 return false
1371         }
1372         if !t.seeding() && !t.needData() {
1373                 return false
1374         }
1375         if len(t.conns) < t.maxEstablishedConns {
1376                 return true
1377         }
1378         return t.worstBadConn() != nil
1379 }
1380
1381 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1382         t.cl.mu.Lock()
1383         defer t.cl.mu.Unlock()
1384         oldMax = t.maxEstablishedConns
1385         t.maxEstablishedConns = max
1386         wcs := slices.HeapInterface(append([]*connection(nil), t.conns...), worseConn)
1387         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1388                 t.dropConnection(wcs.Pop().(*connection))
1389         }
1390         t.openNewConns()
1391         return oldMax
1392 }
1393
1394 func (t *Torrent) mu() missinggo.RWLocker {
1395         return &t.cl.mu
1396 }