]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Some changes to metainfo.InfoEx and testutil
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "fmt"
6         "io"
7         "log"
8         "math"
9         "math/rand"
10         "net"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/missinggo"
16         "github.com/anacrolix/missinggo/bitmap"
17         "github.com/anacrolix/missinggo/itertools"
18         "github.com/anacrolix/missinggo/perf"
19         "github.com/anacrolix/missinggo/pubsub"
20         "github.com/bradfitz/iter"
21
22         "github.com/anacrolix/torrent/bencode"
23         "github.com/anacrolix/torrent/metainfo"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25         "github.com/anacrolix/torrent/storage"
26 )
27
28 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
29         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
30 }
31
32 type peersKey struct {
33         IPBytes string
34         Port    int
35 }
36
37 // Maintains state of torrent within a Client.
38 type Torrent struct {
39         cl *Client
40
41         closing chan struct{}
42
43         // Closed when no more network activity is desired. This includes
44         // announcing, and communicating with peers.
45         ceasingNetworking chan struct{}
46
47         infoHash metainfo.Hash
48         pieces   []piece
49         // Values are the piece indices that changed.
50         pieceStateChanges *pubsub.PubSub
51         chunkSize         pp.Integer
52         // Total length of the torrent in bytes. Stored because it's not O(1) to
53         // get this from the info dict.
54         length int64
55
56         // The storage to open when the info dict becomes available.
57         storageOpener storage.I
58         // Storage for torrent data.
59         storage storage.Torrent
60
61         // The info dict. nil if we don't have it (yet).
62         info *metainfo.InfoEx
63         // Active peer connections, running message stream loops.
64         conns []*connection
65         // Set of addrs to which we're attempting to connect. Connections are
66         // half-open until all handshakes are completed.
67         halfOpen map[string]struct{}
68
69         // Reserve of peers to connect to. A peer can be both here and in the
70         // active connections if were told about the peer after connecting with
71         // them. That encourages us to reconnect to peers that are well known.
72         peers     map[peersKey]Peer
73         wantPeers sync.Cond
74
75         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
76         // mirror their respective URLs from the announce-list metainfo key.
77         trackers []trackerTier
78         // Name used if the info name isn't available.
79         displayName string
80         // The bencoded bytes of the info dict.
81         metadataBytes []byte
82         // Each element corresponds to the 16KiB metadata pieces. If true, we have
83         // received that piece.
84         metadataCompletedChunks []bool
85
86         // Closed when .Info is set.
87         gotMetainfo chan struct{}
88
89         readers map[*Reader]struct{}
90
91         pendingPieces   bitmap.Bitmap
92         completedPieces bitmap.Bitmap
93
94         connPieceInclinationPool sync.Pool
95 }
96
97 func (t *Torrent) setDisplayName(dn string) {
98         t.displayName = dn
99 }
100
101 func (t *Torrent) pieceComplete(piece int) bool {
102         return t.completedPieces.Get(piece)
103 }
104
105 func (t *Torrent) pieceCompleteUncached(piece int) bool {
106         return t.pieces[piece].Storage().GetIsComplete()
107 }
108
109 func (t *Torrent) numConnsUnchoked() (num int) {
110         for _, c := range t.conns {
111                 if !c.PeerChoked {
112                         num++
113                 }
114         }
115         return
116 }
117
118 // There's a connection to that address already.
119 func (t *Torrent) addrActive(addr string) bool {
120         if _, ok := t.halfOpen[addr]; ok {
121                 return true
122         }
123         for _, c := range t.conns {
124                 if c.remoteAddr().String() == addr {
125                         return true
126                 }
127         }
128         return false
129 }
130
131 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
132         wcs = &worstConns{
133                 c:  make([]*connection, 0, len(t.conns)),
134                 t:  t,
135                 cl: cl,
136         }
137         for _, c := range t.conns {
138                 if !c.closed.IsSet() {
139                         wcs.c = append(wcs.c, c)
140                 }
141         }
142         return
143 }
144
145 func (t *Torrent) ceaseNetworking() {
146         select {
147         case <-t.ceasingNetworking:
148                 return
149         default:
150         }
151         close(t.ceasingNetworking)
152         for _, c := range t.conns {
153                 c.Close()
154         }
155 }
156
157 func (t *Torrent) addPeer(p Peer, cl *Client) {
158         cl.openNewConns(t)
159         if len(t.peers) >= torrentPeersHighWater {
160                 return
161         }
162         key := peersKey{string(p.IP), p.Port}
163         if _, ok := t.peers[key]; ok {
164                 return
165         }
166         t.peers[key] = p
167         peersAddedBySource.Add(string(p.Source), 1)
168         cl.openNewConns(t)
169
170 }
171
172 func (t *Torrent) invalidateMetadata() {
173         t.metadataBytes = nil
174         t.metadataCompletedChunks = nil
175         t.info = nil
176 }
177
178 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
179         if t.haveInfo() {
180                 return
181         }
182         if index >= len(t.metadataCompletedChunks) {
183                 log.Printf("%s: ignoring metadata piece %d", t, index)
184                 return
185         }
186         copy(t.metadataBytes[(1<<14)*index:], data)
187         t.metadataCompletedChunks[index] = true
188 }
189
190 func (t *Torrent) metadataPieceCount() int {
191         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
192 }
193
194 func (t *Torrent) haveMetadataPiece(piece int) bool {
195         if t.haveInfo() {
196                 return (1<<14)*piece < len(t.metadataBytes)
197         } else {
198                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
199         }
200 }
201
202 func (t *Torrent) metadataSizeKnown() bool {
203         return t.metadataBytes != nil
204 }
205
206 func (t *Torrent) metadataSize() int {
207         return len(t.metadataBytes)
208 }
209
210 func infoPieceHashes(info *metainfo.Info) (ret []string) {
211         for i := 0; i < len(info.Pieces); i += 20 {
212                 ret = append(ret, string(info.Pieces[i:i+20]))
213         }
214         return
215 }
216
217 // Called when metadata for a torrent becomes available.
218 func (t *Torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
219         err = validateInfo(md)
220         if err != nil {
221                 err = fmt.Errorf("bad info: %s", err)
222                 return
223         }
224         t.info = &metainfo.InfoEx{
225                 Info:  *md,
226                 Bytes: infoBytes,
227                 Hash:  t.infoHash,
228         }
229         t.storage, err = t.storageOpener.OpenTorrent(t.info)
230         if err != nil {
231                 return
232         }
233         t.length = 0
234         for _, f := range t.info.UpvertedFiles() {
235                 t.length += f.Length
236         }
237         t.metadataBytes = infoBytes
238         t.metadataCompletedChunks = nil
239         hashes := infoPieceHashes(md)
240         t.pieces = make([]piece, len(hashes))
241         for i, hash := range hashes {
242                 piece := &t.pieces[i]
243                 piece.t = t
244                 piece.index = i
245                 piece.noPendingWrites.L = &piece.pendingWritesMutex
246                 missinggo.CopyExact(piece.Hash[:], hash)
247         }
248         for _, conn := range t.conns {
249                 if err := conn.setNumPieces(t.numPieces()); err != nil {
250                         log.Printf("closing connection: %s", err)
251                         conn.Close()
252                 }
253         }
254         for i := range t.pieces {
255                 t.updatePieceCompletion(i)
256                 t.pieces[i].QueuedForHash = true
257         }
258         go func() {
259                 for i := range t.pieces {
260                         t.verifyPiece(i)
261                 }
262         }()
263         return
264 }
265
266 func (t *Torrent) verifyPiece(piece int) {
267         t.cl.verifyPiece(t, piece)
268 }
269
270 func (t *Torrent) haveAllMetadataPieces() bool {
271         if t.haveInfo() {
272                 return true
273         }
274         if t.metadataCompletedChunks == nil {
275                 return false
276         }
277         for _, have := range t.metadataCompletedChunks {
278                 if !have {
279                         return false
280                 }
281         }
282         return true
283 }
284
285 // TODO: Propagate errors to disconnect peer.
286 func (t *Torrent) setMetadataSize(bytes int64, cl *Client) {
287         if t.haveInfo() {
288                 // We already know the correct metadata size.
289                 return
290         }
291         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
292                 log.Printf("%s: received bad metadata size: %d", t, bytes)
293                 return
294         }
295         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
296                 return
297         }
298         t.metadataBytes = make([]byte, bytes)
299         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
300         for _, c := range t.conns {
301                 cl.requestPendingMetadata(t, c)
302         }
303
304 }
305
306 // The current working name for the torrent. Either the name in the info dict,
307 // or a display name given such as by the dn value in a magnet link, or "".
308 func (t *Torrent) name() string {
309         if t.haveInfo() {
310                 return t.info.Name
311         }
312         return t.displayName
313 }
314
315 func (t *Torrent) pieceState(index int) (ret PieceState) {
316         p := &t.pieces[index]
317         ret.Priority = t.piecePriority(index)
318         if t.pieceComplete(index) {
319                 ret.Complete = true
320         }
321         if p.QueuedForHash || p.Hashing {
322                 ret.Checking = true
323         }
324         if !ret.Complete && t.piecePartiallyDownloaded(index) {
325                 ret.Partial = true
326         }
327         return
328 }
329
330 func (t *Torrent) metadataPieceSize(piece int) int {
331         return metadataPieceSize(len(t.metadataBytes), piece)
332 }
333
334 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
335         d := map[string]int{
336                 "msg_type": msgType,
337                 "piece":    piece,
338         }
339         if data != nil {
340                 d["total_size"] = len(t.metadataBytes)
341         }
342         p, err := bencode.Marshal(d)
343         if err != nil {
344                 panic(err)
345         }
346         return pp.Message{
347                 Type:            pp.Extended,
348                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
349                 ExtendedPayload: append(p, data...),
350         }
351 }
352
353 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
354         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
355                 ret = append(ret, PieceStateRun{
356                         PieceState: el.(PieceState),
357                         Length:     int(count),
358                 })
359         })
360         for index := range t.pieces {
361                 rle.Append(t.pieceState(index), 1)
362         }
363         rle.Flush()
364         return
365 }
366
367 // Produces a small string representing a PieceStateRun.
368 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
369         ret = fmt.Sprintf("%d", psr.Length)
370         ret += func() string {
371                 switch psr.Priority {
372                 case PiecePriorityNext:
373                         return "N"
374                 case PiecePriorityNormal:
375                         return "."
376                 case PiecePriorityReadahead:
377                         return "R"
378                 case PiecePriorityNow:
379                         return "!"
380                 default:
381                         return ""
382                 }
383         }()
384         if psr.Checking {
385                 ret += "H"
386         }
387         if psr.Partial {
388                 ret += "P"
389         }
390         if psr.Complete {
391                 ret += "C"
392         }
393         return
394 }
395
396 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
397         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
398         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
399         if !t.haveInfo() {
400                 fmt.Fprintf(w, "Metadata have: ")
401                 for _, h := range t.metadataCompletedChunks {
402                         fmt.Fprintf(w, "%c", func() rune {
403                                 if h {
404                                         return 'H'
405                                 } else {
406                                         return '.'
407                                 }
408                         }())
409                 }
410                 fmt.Fprintln(w)
411         }
412         fmt.Fprintf(w, "Piece length: %s\n", func() string {
413                 if t.haveInfo() {
414                         return fmt.Sprint(t.usualPieceSize())
415                 } else {
416                         return "?"
417                 }
418         }())
419         if t.haveInfo() {
420                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
421                 fmt.Fprint(w, "Piece States:")
422                 for _, psr := range t.pieceStateRuns() {
423                         w.Write([]byte(" "))
424                         w.Write([]byte(pieceStateRunStatusChars(psr)))
425                 }
426                 fmt.Fprintln(w)
427         }
428         fmt.Fprintf(w, "Reader Pieces:")
429         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
430                 fmt.Fprintf(w, " %d:%d", begin, end)
431                 return true
432         })
433         fmt.Fprintln(w)
434         fmt.Fprintf(w, "Trackers: ")
435         for _, tier := range t.trackers {
436                 for _, tr := range tier {
437                         fmt.Fprintf(w, "%q ", tr)
438                 }
439         }
440         fmt.Fprintf(w, "\n")
441         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
442         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
443         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
444         sort.Sort(&worstConns{
445                 c:  t.conns,
446                 t:  t,
447                 cl: cl,
448         })
449         for i, c := range t.conns {
450                 fmt.Fprintf(w, "%2d. ", i+1)
451                 c.WriteStatus(w, t)
452         }
453 }
454
455 func (t *Torrent) String() string {
456         s := t.name()
457         if s == "" {
458                 s = fmt.Sprintf("%x", t.infoHash)
459         }
460         return s
461 }
462
463 func (t *Torrent) haveInfo() bool {
464         return t.info != nil
465 }
466
467 // TODO: Include URIs that weren't converted to tracker clients.
468 func (t *Torrent) announceList() (al [][]string) {
469         missinggo.CastSlice(&al, t.trackers)
470         return
471 }
472
473 // Returns a run-time generated MetaInfo that includes the info bytes and
474 // announce-list as currently known to the client.
475 func (t *Torrent) metainfo() *metainfo.MetaInfo {
476         if t.metadataBytes == nil {
477                 panic("info bytes not set")
478         }
479         return &metainfo.MetaInfo{
480                 Info:         *t.info,
481                 CreationDate: time.Now().Unix(),
482                 Comment:      "dynamic metainfo from client",
483                 CreatedBy:    "go.torrent",
484                 AnnounceList: t.announceList(),
485         }
486 }
487
488 func (t *Torrent) bytesLeft() (left int64) {
489         for i := 0; i < t.numPieces(); i++ {
490                 left += int64(t.pieces[i].bytesLeft())
491         }
492         return
493 }
494
495 // Bytes left to give in tracker announces.
496 func (t *Torrent) bytesLeftAnnounce() uint64 {
497         if t.haveInfo() {
498                 return uint64(t.bytesLeft())
499         } else {
500                 return math.MaxUint64
501         }
502 }
503
504 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
505         if t.pieceComplete(piece) {
506                 return false
507         }
508         if t.pieceAllDirty(piece) {
509                 return false
510         }
511         return t.pieces[piece].hasDirtyChunks()
512 }
513
514 func (t *Torrent) usualPieceSize() int {
515         return int(t.info.PieceLength)
516 }
517
518 func (t *Torrent) lastPieceSize() int {
519         return int(t.pieceLength(t.numPieces() - 1))
520 }
521
522 func (t *Torrent) numPieces() int {
523         return t.info.NumPieces()
524 }
525
526 func (t *Torrent) numPiecesCompleted() (num int) {
527         return t.completedPieces.Len()
528 }
529
530 // Safe to call with or without client lock.
531 func (t *Torrent) isClosed() bool {
532         select {
533         case <-t.closing:
534                 return true
535         default:
536                 return false
537         }
538 }
539
540 func (t *Torrent) close() (err error) {
541         if t.isClosed() {
542                 return
543         }
544         t.ceaseNetworking()
545         close(t.closing)
546         if c, ok := t.storage.(io.Closer); ok {
547                 c.Close()
548         }
549         for _, conn := range t.conns {
550                 conn.Close()
551         }
552         t.pieceStateChanges.Close()
553         return
554 }
555
556 func (t *Torrent) requestOffset(r request) int64 {
557         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
558 }
559
560 // Return the request that would include the given offset into the torrent
561 // data. Returns !ok if there is no such request.
562 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
563         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
564 }
565
566 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
567         tr := perf.NewTimer()
568
569         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
570         if err == nil && n != len(data) {
571                 err = io.ErrShortWrite
572         }
573         if err == nil {
574                 tr.Stop("write chunk")
575         }
576         return
577 }
578
579 func (t *Torrent) bitfield() (bf []bool) {
580         bf = make([]bool, t.numPieces())
581         t.completedPieces.IterTyped(func(piece int) (again bool) {
582                 bf[piece] = true
583                 return true
584         })
585         return
586 }
587
588 func (t *Torrent) validOutgoingRequest(r request) bool {
589         if r.Index >= pp.Integer(t.info.NumPieces()) {
590                 return false
591         }
592         if r.Begin%t.chunkSize != 0 {
593                 return false
594         }
595         if r.Length > t.chunkSize {
596                 return false
597         }
598         pieceLength := t.pieceLength(int(r.Index))
599         if r.Begin+r.Length > pieceLength {
600                 return false
601         }
602         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
603 }
604
605 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
606         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
607         var cs chunkSpec
608         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
609                 cs.Length = left
610                 if cs.Length > t.chunkSize {
611                         cs.Length = t.chunkSize
612                 }
613                 css = append(css, cs)
614                 cs.Begin += cs.Length
615         }
616         return
617 }
618
619 func (t *Torrent) pieceNumChunks(piece int) int {
620         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
621 }
622
623 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
624         t.pieces[pieceIndex].DirtyChunks.Clear()
625 }
626
627 type Peer struct {
628         Id     [20]byte
629         IP     net.IP
630         Port   int
631         Source peerSource
632         // Peer is known to support encryption.
633         SupportsEncryption bool
634 }
635
636 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
637         if piece < 0 || piece >= t.info.NumPieces() {
638                 return
639         }
640         if piece == t.numPieces()-1 {
641                 len_ = pp.Integer(t.length % t.info.PieceLength)
642         }
643         if len_ == 0 {
644                 len_ = pp.Integer(t.info.PieceLength)
645         }
646         return
647 }
648
649 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
650         hash := pieceHash.New()
651         p := &t.pieces[piece]
652         p.waitNoPendingWrites()
653         ip := t.info.Piece(piece)
654         pl := ip.Length()
655         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
656         if n == pl {
657                 missinggo.CopyExact(&ret, hash.Sum(nil))
658                 return
659         }
660         if err != io.ErrUnexpectedEOF {
661                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
662         }
663         return
664 }
665
666 func (t *Torrent) haveAllPieces() bool {
667         if !t.haveInfo() {
668                 return false
669         }
670         return t.completedPieces.Len() == t.numPieces()
671 }
672
673 func (t *Torrent) haveAnyPieces() bool {
674         for i := range t.pieces {
675                 if t.pieceComplete(i) {
676                         return true
677                 }
678         }
679         return false
680 }
681
682 func (t *Torrent) havePiece(index int) bool {
683         return t.haveInfo() && t.pieceComplete(index)
684 }
685
686 func (t *Torrent) haveChunk(r request) (ret bool) {
687         // defer func() {
688         //      log.Println("have chunk", r, ret)
689         // }()
690         if !t.haveInfo() {
691                 return false
692         }
693         if t.pieceComplete(int(r.Index)) {
694                 return true
695         }
696         p := &t.pieces[r.Index]
697         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
698 }
699
700 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
701         return int(cs.Begin / chunkSize)
702 }
703
704 func (t *Torrent) wantPiece(r request) bool {
705         if !t.wantPieceIndex(int(r.Index)) {
706                 return false
707         }
708         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
709                 return true
710         }
711         // TODO: What about pieces that were wanted, but aren't now, and aren't
712         // completed either? That used to be done here.
713         return false
714 }
715
716 func (t *Torrent) wantPieceIndex(index int) bool {
717         if !t.haveInfo() {
718                 return false
719         }
720         p := &t.pieces[index]
721         if p.QueuedForHash {
722                 return false
723         }
724         if p.Hashing {
725                 return false
726         }
727         if t.pieceComplete(index) {
728                 return false
729         }
730         if t.pendingPieces.Contains(index) {
731                 return true
732         }
733         return !t.forReaderOffsetPieces(func(begin, end int) bool {
734                 return index < begin || index >= end
735         })
736 }
737
738 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
739         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
740                 for i := begin; begin < end; i++ {
741                         if !f(i) {
742                                 return false
743                         }
744                 }
745                 return true
746         })
747 }
748
749 func (t *Torrent) connHasWantedPieces(c *connection) bool {
750         return !c.pieceRequestOrder.IsEmpty()
751 }
752
753 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
754         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
755                 pieces = append(pieces, int(i))
756         }
757         return
758 }
759
760 func (t *Torrent) worstBadConn(cl *Client) *connection {
761         wcs := t.worstConns(cl)
762         heap.Init(wcs)
763         for wcs.Len() != 0 {
764                 c := heap.Pop(wcs).(*connection)
765                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
766                         return c
767                 }
768                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
769                         // Give connections 1 minute to prove themselves.
770                         if time.Since(c.completedHandshake) > time.Minute {
771                                 return c
772                         }
773                 }
774         }
775         return nil
776 }
777
778 type PieceStateChange struct {
779         Index int
780         PieceState
781 }
782
783 func (t *Torrent) publishPieceChange(piece int) {
784         cur := t.pieceState(piece)
785         p := &t.pieces[piece]
786         if cur != p.PublicPieceState {
787                 p.PublicPieceState = cur
788                 t.pieceStateChanges.Publish(PieceStateChange{
789                         piece,
790                         cur,
791                 })
792         }
793 }
794
795 func (t *Torrent) pieceNumPendingChunks(piece int) int {
796         if t.pieceComplete(piece) {
797                 return 0
798         }
799         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
800 }
801
802 func (t *Torrent) pieceAllDirty(piece int) bool {
803         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
804 }
805
806 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
807         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
808                 if begin < end {
809                         if !f(begin) {
810                                 return false
811                         }
812                 }
813                 return true
814         })
815 }
816
817 func (t *Torrent) readersChanged() {
818         t.updatePiecePriorities()
819 }
820
821 func (t *Torrent) maybeNewConns() {
822         // Tickle the accept routine.
823         t.cl.event.Broadcast()
824         t.openNewConns()
825 }
826
827 func (t *Torrent) piecePriorityChanged(piece int) {
828         for _, c := range t.conns {
829                 c.updatePiecePriority(piece)
830         }
831         t.maybeNewConns()
832         t.publishPieceChange(piece)
833 }
834
835 func (t *Torrent) updatePiecePriority(piece int) bool {
836         p := &t.pieces[piece]
837         newPrio := t.piecePriorityUncached(piece)
838         if newPrio == p.priority {
839                 return false
840         }
841         p.priority = newPrio
842         return true
843 }
844
845 // Update all piece priorities in one hit. This function should have the same
846 // output as updatePiecePriority, but across all pieces.
847 func (t *Torrent) updatePiecePriorities() {
848         newPrios := make([]piecePriority, t.numPieces())
849         t.pendingPieces.IterTyped(func(piece int) (more bool) {
850                 newPrios[piece] = PiecePriorityNormal
851                 return true
852         })
853         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
854                 if begin < end {
855                         newPrios[begin].Raise(PiecePriorityNow)
856                 }
857                 for i := begin + 1; i < end; i++ {
858                         newPrios[i].Raise(PiecePriorityReadahead)
859                 }
860                 return true
861         })
862         t.completedPieces.IterTyped(func(piece int) (more bool) {
863                 newPrios[piece] = PiecePriorityNone
864                 return true
865         })
866         for i, prio := range newPrios {
867                 if prio != t.pieces[i].priority {
868                         t.pieces[i].priority = prio
869                         t.piecePriorityChanged(i)
870                 }
871         }
872 }
873
874 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
875         if off >= t.length {
876                 return
877         }
878         if off < 0 {
879                 size += off
880                 off = 0
881         }
882         if size <= 0 {
883                 return
884         }
885         begin = int(off / t.info.PieceLength)
886         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
887         if end > t.info.NumPieces() {
888                 end = t.info.NumPieces()
889         }
890         return
891 }
892
893 // Returns true if all iterations complete without breaking.
894 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
895         // There's an oppurtunity here to build a map of beginning pieces, and a
896         // bitmap of the rest. I wonder if it's worth the allocation overhead.
897         for r := range t.readers {
898                 r.mu.Lock()
899                 pos, readahead := r.pos, r.readahead
900                 r.mu.Unlock()
901                 if readahead < 1 {
902                         readahead = 1
903                 }
904                 begin, end := t.byteRegionPieces(pos, readahead)
905                 if begin >= end {
906                         continue
907                 }
908                 if !f(begin, end) {
909                         return false
910                 }
911         }
912         return true
913 }
914
915 func (t *Torrent) piecePriority(piece int) piecePriority {
916         if !t.haveInfo() {
917                 return PiecePriorityNone
918         }
919         return t.pieces[piece].priority
920 }
921
922 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
923         ret = PiecePriorityNone
924         if t.pieceComplete(piece) {
925                 return
926         }
927         if t.pendingPieces.Contains(piece) {
928                 ret = PiecePriorityNormal
929         }
930         raiseRet := ret.Raise
931         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
932                 if piece == begin {
933                         raiseRet(PiecePriorityNow)
934                 }
935                 if begin <= piece && piece < end {
936                         raiseRet(PiecePriorityReadahead)
937                 }
938                 return true
939         })
940         return
941 }
942
943 func (t *Torrent) pendPiece(piece int) {
944         if t.pendingPieces.Contains(piece) {
945                 return
946         }
947         if t.havePiece(piece) {
948                 return
949         }
950         t.pendingPieces.Add(piece)
951         if !t.updatePiecePriority(piece) {
952                 return
953         }
954         t.piecePriorityChanged(piece)
955 }
956
957 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
958         return t.completedPieces.Copy()
959 }
960
961 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
962         t.pendingPieces.Sub(unpend)
963         t.updatePiecePriorities()
964 }
965
966 func (t *Torrent) pendPieceRange(begin, end int) {
967         for i := begin; i < end; i++ {
968                 t.pendPiece(i)
969         }
970 }
971
972 func (t *Torrent) unpendPieceRange(begin, end int) {
973         var bm bitmap.Bitmap
974         bm.AddRange(begin, end)
975         t.unpendPieces(&bm)
976 }
977
978 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
979         if !c.PeerHasPiece(piece) {
980                 return true
981         }
982         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
983         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
984                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
985                 return c.Request(req)
986         })
987 }
988
989 func (t *Torrent) pendRequest(req request) {
990         ci := chunkIndex(req.chunkSpec, t.chunkSize)
991         t.pieces[req.Index].pendChunkIndex(ci)
992 }
993
994 func (t *Torrent) pieceChanged(piece int) {
995         t.cl.pieceChanged(t, piece)
996 }
997
998 func (t *Torrent) openNewConns() {
999         t.cl.openNewConns(t)
1000 }
1001
1002 func (t *Torrent) getConnPieceInclination() []int {
1003         _ret := t.connPieceInclinationPool.Get()
1004         if _ret == nil {
1005                 pieceInclinationsNew.Add(1)
1006                 return rand.Perm(t.numPieces())
1007         }
1008         pieceInclinationsReused.Add(1)
1009         return _ret.([]int)
1010 }
1011
1012 func (t *Torrent) putPieceInclination(pi []int) {
1013         t.connPieceInclinationPool.Put(pi)
1014         pieceInclinationsPut.Add(1)
1015 }
1016
1017 func (t *Torrent) updatePieceCompletion(piece int) {
1018         pcu := t.pieceCompleteUncached(piece)
1019         changed := t.completedPieces.Get(piece) != pcu
1020         t.completedPieces.Set(piece, pcu)
1021         if changed {
1022                 t.pieceChanged(piece)
1023         }
1024 }
1025
1026 // Non-blocking read. Client lock is not required.
1027 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1028         p := &t.pieces[off/t.info.PieceLength]
1029         p.waitNoPendingWrites()
1030         return p.Storage().ReadAt(b, off-p.Info().Offset())
1031 }
1032
1033 func (t *Torrent) updateAllPieceCompletions() {
1034         for i := range iter.N(t.numPieces()) {
1035                 t.updatePieceCompletion(i)
1036         }
1037 }