]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Tidy up bytesLeft calculations
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math"
10         "math/rand"
11         "net"
12         "sort"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/bitmap"
18         "github.com/anacrolix/missinggo/itertools"
19         "github.com/anacrolix/missinggo/perf"
20         "github.com/anacrolix/missinggo/pubsub"
21         "github.com/bradfitz/iter"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/metainfo"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
29         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
30 }
31
32 type peersKey struct {
33         IPBytes string
34         Port    int
35 }
36
37 // Maintains state of torrent within a Client.
38 type torrent struct {
39         cl *Client
40
41         closing chan struct{}
42
43         // Closed when no more network activity is desired. This includes
44         // announcing, and communicating with peers.
45         ceasingNetworking chan struct{}
46
47         InfoHash InfoHash
48         Pieces   []piece
49         // Values are the piece indices that changed.
50         pieceStateChanges *pubsub.PubSub
51         chunkSize         pp.Integer
52         // Total length of the torrent in bytes. Stored because it's not O(1) to
53         // get this from the info dict.
54         length int64
55
56         data Data
57
58         // The info dict. Nil if we don't have it (yet).
59         Info *metainfo.Info
60         // Active peer connections, running message stream loops.
61         Conns []*connection
62         // Set of addrs to which we're attempting to connect. Connections are
63         // half-open until all handshakes are completed.
64         HalfOpen map[string]struct{}
65
66         // Reserve of peers to connect to. A peer can be both here and in the
67         // active connections if were told about the peer after connecting with
68         // them. That encourages us to reconnect to peers that are well known.
69         Peers     map[peersKey]Peer
70         wantPeers sync.Cond
71
72         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
73         // mirror their respective URLs from the announce-list metainfo key.
74         Trackers []trackerTier
75         // Name used if the info name isn't available.
76         displayName string
77         // The bencoded bytes of the info dict.
78         MetaData []byte
79         // Each element corresponds to the 16KiB metadata pieces. If true, we have
80         // received that piece.
81         metadataHave []bool
82
83         // Closed when .Info is set.
84         gotMetainfo chan struct{}
85
86         readers map[*Reader]struct{}
87
88         pendingPieces   bitmap.Bitmap
89         completedPieces bitmap.Bitmap
90
91         connPieceInclinationPool sync.Pool
92 }
93
94 var (
95         pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
96         pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
97         pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
98 )
99
100 func (t *torrent) setDisplayName(dn string) {
101         t.displayName = dn
102 }
103
104 func (t *torrent) pieceComplete(piece int) bool {
105         return t.completedPieces.Get(piece)
106 }
107
108 func (t *torrent) pieceCompleteUncached(piece int) bool {
109         // TODO: This is called when setting metadata, and before storage is
110         // assigned, which doesn't seem right.
111         return t.data != nil && t.data.PieceComplete(piece)
112 }
113
114 func (t *torrent) numConnsUnchoked() (num int) {
115         for _, c := range t.Conns {
116                 if !c.PeerChoked {
117                         num++
118                 }
119         }
120         return
121 }
122
123 // There's a connection to that address already.
124 func (t *torrent) addrActive(addr string) bool {
125         if _, ok := t.HalfOpen[addr]; ok {
126                 return true
127         }
128         for _, c := range t.Conns {
129                 if c.remoteAddr().String() == addr {
130                         return true
131                 }
132         }
133         return false
134 }
135
136 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
137         wcs = &worstConns{
138                 c:  make([]*connection, 0, len(t.Conns)),
139                 t:  t,
140                 cl: cl,
141         }
142         for _, c := range t.Conns {
143                 if !c.closed.IsSet() {
144                         wcs.c = append(wcs.c, c)
145                 }
146         }
147         return
148 }
149
150 func (t *torrent) ceaseNetworking() {
151         select {
152         case <-t.ceasingNetworking:
153                 return
154         default:
155         }
156         close(t.ceasingNetworking)
157         for _, c := range t.Conns {
158                 c.Close()
159         }
160 }
161
162 func (t *torrent) addPeer(p Peer, cl *Client) {
163         cl.openNewConns(t)
164         if len(t.Peers) >= torrentPeersHighWater {
165                 return
166         }
167         key := peersKey{string(p.IP), p.Port}
168         if _, ok := t.Peers[key]; ok {
169                 return
170         }
171         t.Peers[key] = p
172         peersAddedBySource.Add(string(p.Source), 1)
173         cl.openNewConns(t)
174
175 }
176
177 func (t *torrent) invalidateMetadata() {
178         t.MetaData = nil
179         t.metadataHave = nil
180         t.Info = nil
181 }
182
183 func (t *torrent) saveMetadataPiece(index int, data []byte) {
184         if t.haveInfo() {
185                 return
186         }
187         if index >= len(t.metadataHave) {
188                 log.Printf("%s: ignoring metadata piece %d", t, index)
189                 return
190         }
191         copy(t.MetaData[(1<<14)*index:], data)
192         t.metadataHave[index] = true
193 }
194
195 func (t *torrent) metadataPieceCount() int {
196         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
197 }
198
199 func (t *torrent) haveMetadataPiece(piece int) bool {
200         if t.haveInfo() {
201                 return (1<<14)*piece < len(t.MetaData)
202         } else {
203                 return piece < len(t.metadataHave) && t.metadataHave[piece]
204         }
205 }
206
207 func (t *torrent) metadataSizeKnown() bool {
208         return t.MetaData != nil
209 }
210
211 func (t *torrent) metadataSize() int {
212         return len(t.MetaData)
213 }
214
215 func infoPieceHashes(info *metainfo.Info) (ret []string) {
216         for i := 0; i < len(info.Pieces); i += 20 {
217                 ret = append(ret, string(info.Pieces[i:i+20]))
218         }
219         return
220 }
221
222 // Called when metadata for a torrent becomes available.
223 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
224         err = validateInfo(md)
225         if err != nil {
226                 err = fmt.Errorf("bad info: %s", err)
227                 return
228         }
229         t.Info = md
230         t.length = 0
231         for _, f := range t.Info.UpvertedFiles() {
232                 t.length += f.Length
233         }
234         t.MetaData = infoBytes
235         t.metadataHave = nil
236         hashes := infoPieceHashes(md)
237         t.Pieces = make([]piece, len(hashes))
238         for i, hash := range hashes {
239                 piece := &t.Pieces[i]
240                 piece.t = t
241                 piece.index = i
242                 piece.noPendingWrites.L = &piece.pendingWritesMutex
243                 missinggo.CopyExact(piece.Hash[:], hash)
244         }
245         for _, conn := range t.Conns {
246                 if err := conn.setNumPieces(t.numPieces()); err != nil {
247                         log.Printf("closing connection: %s", err)
248                         conn.Close()
249                 }
250         }
251         return
252 }
253
254 func (t *torrent) setStorage(td Data) {
255         if t.data != nil {
256                 t.data.Close()
257         }
258         t.data = td
259         for i := range t.Pieces {
260                 t.updatePieceCompletion(i)
261                 t.Pieces[i].QueuedForHash = true
262         }
263         go func() {
264                 for i := range t.Pieces {
265                         t.verifyPiece(i)
266                 }
267         }()
268 }
269
270 func (t *torrent) verifyPiece(piece int) {
271         t.cl.verifyPiece(t, piece)
272 }
273
274 func (t *torrent) haveAllMetadataPieces() bool {
275         if t.haveInfo() {
276                 return true
277         }
278         if t.metadataHave == nil {
279                 return false
280         }
281         for _, have := range t.metadataHave {
282                 if !have {
283                         return false
284                 }
285         }
286         return true
287 }
288
289 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
290         if t.haveInfo() {
291                 // We already know the correct metadata size.
292                 return
293         }
294         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
295                 log.Printf("received bad metadata size: %d", bytes)
296                 return
297         }
298         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
299                 return
300         }
301         t.MetaData = make([]byte, bytes)
302         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
303         for _, c := range t.Conns {
304                 cl.requestPendingMetadata(t, c)
305         }
306
307 }
308
309 // The current working name for the torrent. Either the name in the info dict,
310 // or a display name given such as by the dn value in a magnet link, or "".
311 func (t *torrent) Name() string {
312         if t.haveInfo() {
313                 return t.Info.Name
314         }
315         return t.displayName
316 }
317
318 func (t *torrent) pieceState(index int) (ret PieceState) {
319         p := &t.Pieces[index]
320         ret.Priority = t.piecePriority(index)
321         if t.pieceComplete(index) {
322                 ret.Complete = true
323         }
324         if p.QueuedForHash || p.Hashing {
325                 ret.Checking = true
326         }
327         if !ret.Complete && t.piecePartiallyDownloaded(index) {
328                 ret.Partial = true
329         }
330         return
331 }
332
333 func (t *torrent) metadataPieceSize(piece int) int {
334         return metadataPieceSize(len(t.MetaData), piece)
335 }
336
337 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
338         d := map[string]int{
339                 "msg_type": msgType,
340                 "piece":    piece,
341         }
342         if data != nil {
343                 d["total_size"] = len(t.MetaData)
344         }
345         p, err := bencode.Marshal(d)
346         if err != nil {
347                 panic(err)
348         }
349         return pp.Message{
350                 Type:            pp.Extended,
351                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
352                 ExtendedPayload: append(p, data...),
353         }
354 }
355
356 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
357         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
358                 ret = append(ret, PieceStateRun{
359                         PieceState: el.(PieceState),
360                         Length:     int(count),
361                 })
362         })
363         for index := range t.Pieces {
364                 rle.Append(t.pieceState(index), 1)
365         }
366         rle.Flush()
367         return
368 }
369
370 // Produces a small string representing a PieceStateRun.
371 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
372         ret = fmt.Sprintf("%d", psr.Length)
373         ret += func() string {
374                 switch psr.Priority {
375                 case PiecePriorityNext:
376                         return "N"
377                 case PiecePriorityNormal:
378                         return "."
379                 case PiecePriorityReadahead:
380                         return "R"
381                 case PiecePriorityNow:
382                         return "!"
383                 default:
384                         return ""
385                 }
386         }()
387         if psr.Checking {
388                 ret += "H"
389         }
390         if psr.Partial {
391                 ret += "P"
392         }
393         if psr.Complete {
394                 ret += "C"
395         }
396         return
397 }
398
399 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
400         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
401         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
402         if !t.haveInfo() {
403                 fmt.Fprintf(w, "Metadata have: ")
404                 for _, h := range t.metadataHave {
405                         fmt.Fprintf(w, "%c", func() rune {
406                                 if h {
407                                         return 'H'
408                                 } else {
409                                         return '.'
410                                 }
411                         }())
412                 }
413                 fmt.Fprintln(w)
414         }
415         fmt.Fprintf(w, "Piece length: %s\n", func() string {
416                 if t.haveInfo() {
417                         return fmt.Sprint(t.usualPieceSize())
418                 } else {
419                         return "?"
420                 }
421         }())
422         if t.haveInfo() {
423                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
424                 fmt.Fprint(w, "Piece States:")
425                 for _, psr := range t.pieceStateRuns() {
426                         w.Write([]byte(" "))
427                         w.Write([]byte(pieceStateRunStatusChars(psr)))
428                 }
429                 fmt.Fprintln(w)
430         }
431         fmt.Fprintf(w, "Reader Pieces:")
432         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
433                 fmt.Fprintf(w, " %d:%d", begin, end)
434                 return true
435         })
436         fmt.Fprintln(w)
437         fmt.Fprintf(w, "Trackers: ")
438         for _, tier := range t.Trackers {
439                 for _, tr := range tier {
440                         fmt.Fprintf(w, "%q ", tr)
441                 }
442         }
443         fmt.Fprintf(w, "\n")
444         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
445         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
446         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
447         sort.Sort(&worstConns{
448                 c:  t.Conns,
449                 t:  t,
450                 cl: cl,
451         })
452         for i, c := range t.Conns {
453                 fmt.Fprintf(w, "%2d. ", i+1)
454                 c.WriteStatus(w, t)
455         }
456 }
457
458 func (t *torrent) String() string {
459         s := t.Name()
460         if s == "" {
461                 s = fmt.Sprintf("%x", t.InfoHash)
462         }
463         return s
464 }
465
466 func (t *torrent) haveInfo() bool {
467         return t.Info != nil
468 }
469
470 // TODO: Include URIs that weren't converted to tracker clients.
471 func (t *torrent) announceList() (al [][]string) {
472         missinggo.CastSlice(&al, t.Trackers)
473         return
474 }
475
476 // Returns a run-time generated MetaInfo that includes the info bytes and
477 // announce-list as currently known to the client.
478 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
479         if t.MetaData == nil {
480                 panic("info bytes not set")
481         }
482         return &metainfo.MetaInfo{
483                 Info: metainfo.InfoEx{
484                         Info:  *t.Info,
485                         Bytes: t.MetaData,
486                 },
487                 CreationDate: time.Now().Unix(),
488                 Comment:      "dynamic metainfo from client",
489                 CreatedBy:    "go.torrent",
490                 AnnounceList: t.announceList(),
491         }
492 }
493
494 func (t *torrent) bytesLeft() (left int64) {
495         for i := 0; i < t.numPieces(); i++ {
496                 left += int64(t.Pieces[i].bytesLeft())
497         }
498         return
499 }
500
501 // Bytes left to give in tracker announces.
502 func (t *torrent) bytesLeftAnnounce() uint64 {
503         if t.haveInfo() {
504                 return uint64(t.bytesLeft())
505         } else {
506                 return math.MaxUint64
507         }
508 }
509
510 func (t *torrent) piecePartiallyDownloaded(piece int) bool {
511         if t.pieceComplete(piece) {
512                 return false
513         }
514         if t.pieceAllDirty(piece) {
515                 return false
516         }
517         return t.Pieces[piece].hasDirtyChunks()
518 }
519
520 func numChunksForPiece(chunkSize int, pieceSize int) int {
521         return (pieceSize + chunkSize - 1) / chunkSize
522 }
523
524 func (t *torrent) usualPieceSize() int {
525         return int(t.Info.PieceLength)
526 }
527
528 func (t *torrent) lastPieceSize() int {
529         return int(t.pieceLength(t.numPieces() - 1))
530 }
531
532 func (t *torrent) numPieces() int {
533         return t.Info.NumPieces()
534 }
535
536 func (t *torrent) numPiecesCompleted() (num int) {
537         return t.completedPieces.Len()
538 }
539
540 // Safe to call with or without client lock.
541 func (t *torrent) isClosed() bool {
542         select {
543         case <-t.closing:
544                 return true
545         default:
546                 return false
547         }
548 }
549
550 func (t *torrent) close() (err error) {
551         if t.isClosed() {
552                 return
553         }
554         t.ceaseNetworking()
555         close(t.closing)
556         if c, ok := t.data.(io.Closer); ok {
557                 c.Close()
558         }
559         for _, conn := range t.Conns {
560                 conn.Close()
561         }
562         t.pieceStateChanges.Close()
563         return
564 }
565
566 func (t *torrent) requestOffset(r request) int64 {
567         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
568 }
569
570 // Return the request that would include the given offset into the torrent
571 // data. Returns !ok if there is no such request.
572 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
573         return torrentOffsetRequest(t.length, t.Info.PieceLength, int64(t.chunkSize), off)
574 }
575
576 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
577         tr := perf.NewTimer()
578         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
579         if err == nil && n != len(data) {
580                 err = io.ErrShortWrite
581         }
582         if err == nil {
583                 tr.Stop("write chunk")
584         }
585         return
586 }
587
588 func (t *torrent) bitfield() (bf []bool) {
589         bf = make([]bool, t.numPieces())
590         t.completedPieces.IterTyped(func(piece int) (again bool) {
591                 bf[piece] = true
592                 return true
593         })
594         return
595 }
596
597 func (t *torrent) validOutgoingRequest(r request) bool {
598         if r.Index >= pp.Integer(t.Info.NumPieces()) {
599                 return false
600         }
601         if r.Begin%t.chunkSize != 0 {
602                 return false
603         }
604         if r.Length > t.chunkSize {
605                 return false
606         }
607         pieceLength := t.pieceLength(int(r.Index))
608         if r.Begin+r.Length > pieceLength {
609                 return false
610         }
611         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
612 }
613
614 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
615         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
616         var cs chunkSpec
617         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
618                 cs.Length = left
619                 if cs.Length > t.chunkSize {
620                         cs.Length = t.chunkSize
621                 }
622                 css = append(css, cs)
623                 cs.Begin += cs.Length
624         }
625         return
626 }
627
628 func (t *torrent) pieceNumChunks(piece int) int {
629         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
630 }
631
632 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
633         t.Pieces[pieceIndex].DirtyChunks.Clear()
634 }
635
636 type Peer struct {
637         Id     [20]byte
638         IP     net.IP
639         Port   int
640         Source peerSource
641         // Peer is known to support encryption.
642         SupportsEncryption bool
643 }
644
645 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
646         if piece < 0 || piece >= t.Info.NumPieces() {
647                 return
648         }
649         if int(piece) == t.numPieces()-1 {
650                 len_ = pp.Integer(t.length % t.Info.PieceLength)
651         }
652         if len_ == 0 {
653                 len_ = pp.Integer(t.Info.PieceLength)
654         }
655         return
656 }
657
658 func (t *torrent) hashPiece(piece int) (ret pieceSum) {
659         hash := pieceHash.New()
660         p := &t.Pieces[piece]
661         p.waitNoPendingWrites()
662         ip := t.Info.Piece(piece)
663         pl := ip.Length()
664         n, err := io.Copy(hash, io.NewSectionReader(t.data, ip.Offset(), pl))
665         if n == pl {
666                 missinggo.CopyExact(&ret, hash.Sum(nil))
667                 return
668         }
669         if err != io.ErrUnexpectedEOF {
670                 log.Printf("unexpected error hashing piece with %T: %s", t.data, err)
671         }
672         return
673 }
674
675 func (t *torrent) haveAllPieces() bool {
676         if !t.haveInfo() {
677                 return false
678         }
679         return t.completedPieces.Len() == t.numPieces()
680 }
681
682 func (me *torrent) haveAnyPieces() bool {
683         for i := range me.Pieces {
684                 if me.pieceComplete(i) {
685                         return true
686                 }
687         }
688         return false
689 }
690
691 func (t *torrent) havePiece(index int) bool {
692         return t.haveInfo() && t.pieceComplete(index)
693 }
694
695 func (t *torrent) haveChunk(r request) (ret bool) {
696         // defer func() {
697         //      log.Println("have chunk", r, ret)
698         // }()
699         if !t.haveInfo() {
700                 return false
701         }
702         if t.pieceComplete(int(r.Index)) {
703                 return true
704         }
705         p := &t.Pieces[r.Index]
706         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
707 }
708
709 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
710         return int(cs.Begin / chunkSize)
711 }
712
713 // TODO: This should probably be called wantPiece.
714 func (t *torrent) wantChunk(r request) bool {
715         if !t.wantPiece(int(r.Index)) {
716                 return false
717         }
718         if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
719                 return true
720         }
721         // TODO: What about pieces that were wanted, but aren't now, and aren't
722         // completed either? That used to be done here.
723         return false
724 }
725
726 // TODO: This should be called wantPieceIndex.
727 func (t *torrent) wantPiece(index int) bool {
728         if !t.haveInfo() {
729                 return false
730         }
731         p := &t.Pieces[index]
732         if p.QueuedForHash {
733                 return false
734         }
735         if p.Hashing {
736                 return false
737         }
738         if t.pieceComplete(index) {
739                 return false
740         }
741         if t.pendingPieces.Contains(index) {
742                 return true
743         }
744         return !t.forReaderOffsetPieces(func(begin, end int) bool {
745                 return index < begin || index >= end
746         })
747 }
748
749 func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
750         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
751                 for i := begin; begin < end; i++ {
752                         if !f(i) {
753                                 return false
754                         }
755                 }
756                 return true
757         })
758 }
759
760 func (t *torrent) connHasWantedPieces(c *connection) bool {
761         return !c.pieceRequestOrder.IsEmpty()
762 }
763
764 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
765         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
766                 pieces = append(pieces, int(i))
767         }
768         return
769 }
770
771 func (t *torrent) worstBadConn(cl *Client) *connection {
772         wcs := t.worstConns(cl)
773         heap.Init(wcs)
774         for wcs.Len() != 0 {
775                 c := heap.Pop(wcs).(*connection)
776                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
777                         return c
778                 }
779                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
780                         // Give connections 1 minute to prove themselves.
781                         if time.Since(c.completedHandshake) > time.Minute {
782                                 return c
783                         }
784                 }
785         }
786         return nil
787 }
788
789 type PieceStateChange struct {
790         Index int
791         PieceState
792 }
793
794 func (t *torrent) publishPieceChange(piece int) {
795         cur := t.pieceState(piece)
796         p := &t.Pieces[piece]
797         if cur != p.PublicPieceState {
798                 p.PublicPieceState = cur
799                 t.pieceStateChanges.Publish(PieceStateChange{
800                         piece,
801                         cur,
802                 })
803         }
804 }
805
806 func (t *torrent) pieceNumPendingChunks(piece int) int {
807         if t.pieceComplete(piece) {
808                 return 0
809         }
810         return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
811 }
812
813 func (t *torrent) pieceAllDirty(piece int) bool {
814         return t.Pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
815 }
816
817 func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
818         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
819                 if begin < end {
820                         if !f(begin) {
821                                 return false
822                         }
823                 }
824                 return true
825         })
826 }
827
828 func (t *torrent) readersChanged() {
829         t.updatePiecePriorities()
830 }
831
832 func (t *torrent) maybeNewConns() {
833         // Tickle the accept routine.
834         t.cl.event.Broadcast()
835         t.openNewConns()
836 }
837
838 func (t *torrent) piecePriorityChanged(piece int) {
839         for _, c := range t.Conns {
840                 c.updatePiecePriority(piece)
841         }
842         t.maybeNewConns()
843         t.publishPieceChange(piece)
844 }
845
846 func (t *torrent) updatePiecePriority(piece int) bool {
847         p := &t.Pieces[piece]
848         newPrio := t.piecePriorityUncached(piece)
849         if newPrio == p.priority {
850                 return false
851         }
852         p.priority = newPrio
853         return true
854 }
855
856 // Update all piece priorities in one hit. This function should have the same
857 // output as updatePiecePriority, but across all pieces.
858 func (t *torrent) updatePiecePriorities() {
859         newPrios := make([]piecePriority, t.numPieces())
860         t.pendingPieces.IterTyped(func(piece int) (more bool) {
861                 newPrios[piece] = PiecePriorityNormal
862                 return true
863         })
864         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
865                 if begin < end {
866                         newPrios[begin].Raise(PiecePriorityNow)
867                 }
868                 for i := begin + 1; i < end; i++ {
869                         newPrios[i].Raise(PiecePriorityReadahead)
870                 }
871                 return true
872         })
873         t.completedPieces.IterTyped(func(piece int) (more bool) {
874                 newPrios[piece] = PiecePriorityNone
875                 return true
876         })
877         for i, prio := range newPrios {
878                 if prio != t.Pieces[i].priority {
879                         t.Pieces[i].priority = prio
880                         t.piecePriorityChanged(i)
881                 }
882         }
883 }
884
885 func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
886         if off >= t.length {
887                 return
888         }
889         if off < 0 {
890                 size += off
891                 off = 0
892         }
893         if size <= 0 {
894                 return
895         }
896         begin = int(off / t.Info.PieceLength)
897         end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
898         if end > t.Info.NumPieces() {
899                 end = t.Info.NumPieces()
900         }
901         return
902 }
903
904 // Returns true if all iterations complete without breaking.
905 func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
906         // There's an oppurtunity here to build a map of beginning pieces, and a
907         // bitmap of the rest. I wonder if it's worth the allocation overhead.
908         for r := range t.readers {
909                 r.mu.Lock()
910                 pos, readahead := r.pos, r.readahead
911                 r.mu.Unlock()
912                 if readahead < 1 {
913                         readahead = 1
914                 }
915                 begin, end := t.byteRegionPieces(pos, readahead)
916                 if begin >= end {
917                         continue
918                 }
919                 if !f(begin, end) {
920                         return false
921                 }
922         }
923         return true
924 }
925
926 func (t *torrent) piecePriority(piece int) piecePriority {
927         if !t.haveInfo() {
928                 return PiecePriorityNone
929         }
930         return t.Pieces[piece].priority
931 }
932
933 func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
934         ret = PiecePriorityNone
935         if t.pieceComplete(piece) {
936                 return
937         }
938         if t.pendingPieces.Contains(piece) {
939                 ret = PiecePriorityNormal
940         }
941         raiseRet := ret.Raise
942         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
943                 if piece == begin {
944                         raiseRet(PiecePriorityNow)
945                 }
946                 if begin <= piece && piece < end {
947                         raiseRet(PiecePriorityReadahead)
948                 }
949                 return true
950         })
951         return
952 }
953
954 func (t *torrent) pendPiece(piece int) {
955         if t.pendingPieces.Contains(piece) {
956                 return
957         }
958         if t.havePiece(piece) {
959                 return
960         }
961         t.pendingPieces.Add(piece)
962         if !t.updatePiecePriority(piece) {
963                 return
964         }
965         t.piecePriorityChanged(piece)
966 }
967
968 func (t *torrent) getCompletedPieces() (ret bitmap.Bitmap) {
969         return t.completedPieces.Copy()
970 }
971
972 func (t *torrent) unpendPieces(unpend *bitmap.Bitmap) {
973         t.pendingPieces.Sub(unpend)
974         t.updatePiecePriorities()
975 }
976
977 func (t *torrent) pendPieceRange(begin, end int) {
978         for i := begin; i < end; i++ {
979                 t.pendPiece(i)
980         }
981 }
982
983 func (t *torrent) unpendPieceRange(begin, end int) {
984         var bm bitmap.Bitmap
985         bm.AddRange(begin, end)
986         t.unpendPieces(&bm)
987 }
988
989 func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
990         if !c.PeerHasPiece(piece) {
991                 return true
992         }
993         chunkIndices := t.Pieces[piece].undirtiedChunkIndices().ToSortedSlice()
994         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
995                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
996                 return c.Request(req)
997         })
998 }
999
1000 func (t *torrent) pendRequest(req request) {
1001         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1002         t.Pieces[req.Index].pendChunkIndex(ci)
1003 }
1004
1005 func (t *torrent) pieceChanged(piece int) {
1006         t.cl.pieceChanged(t, piece)
1007 }
1008
1009 func (t *torrent) openNewConns() {
1010         t.cl.openNewConns(t)
1011 }
1012
1013 func (t *torrent) getConnPieceInclination() []int {
1014         _ret := t.connPieceInclinationPool.Get()
1015         if _ret == nil {
1016                 pieceInclinationsNew.Add(1)
1017                 return rand.Perm(t.numPieces())
1018         }
1019         pieceInclinationsReused.Add(1)
1020         return _ret.([]int)
1021 }
1022
1023 func (t *torrent) putPieceInclination(pi []int) {
1024         t.connPieceInclinationPool.Put(pi)
1025         pieceInclinationsPut.Add(1)
1026 }
1027
1028 func (t *torrent) updatePieceCompletion(piece int) {
1029         t.completedPieces.Set(piece, t.pieceCompleteUncached(piece))
1030 }
1031
1032 // Non-blocking read. Client lock is not required.
1033 func (t *torrent) readAt(b []byte, off int64) (n int, err error) {
1034         if off+int64(len(b)) > t.length {
1035                 b = b[:t.length-off]
1036         }
1037         for pi := off / t.Info.PieceLength; pi*t.Info.PieceLength < off+int64(len(b)); pi++ {
1038                 t.Pieces[pi].waitNoPendingWrites()
1039         }
1040         return t.data.ReadAt(b, off)
1041 }
1042
1043 func (t *torrent) updateAllPieceCompletions() {
1044         for i := range iter.N(t.numPieces()) {
1045                 t.updatePieceCompletion(i)
1046         }
1047 }