]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Make opening a torrent in storage an explicit method
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math"
10         "math/rand"
11         "net"
12         "sort"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/bitmap"
18         "github.com/anacrolix/missinggo/itertools"
19         "github.com/anacrolix/missinggo/perf"
20         "github.com/anacrolix/missinggo/pubsub"
21         "github.com/bradfitz/iter"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/metainfo"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26         "github.com/anacrolix/torrent/storage"
27 )
28
29 func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
30         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
31 }
32
33 type peersKey struct {
34         IPBytes string
35         Port    int
36 }
37
38 // Maintains state of torrent within a Client.
39 type torrent struct {
40         cl *Client
41
42         closing chan struct{}
43
44         // Closed when no more network activity is desired. This includes
45         // announcing, and communicating with peers.
46         ceasingNetworking chan struct{}
47
48         InfoHash metainfo.InfoHash
49         Pieces   []piece
50         // Values are the piece indices that changed.
51         pieceStateChanges *pubsub.PubSub
52         chunkSize         pp.Integer
53         // Total length of the torrent in bytes. Stored because it's not O(1) to
54         // get this from the info dict.
55         length int64
56
57         storageOpener storage.I
58         storage       storage.Torrent
59
60         // The info dict. Nil if we don't have it (yet).
61         Info *metainfo.InfoEx
62         // Active peer connections, running message stream loops.
63         Conns []*connection
64         // Set of addrs to which we're attempting to connect. Connections are
65         // half-open until all handshakes are completed.
66         HalfOpen map[string]struct{}
67
68         // Reserve of peers to connect to. A peer can be both here and in the
69         // active connections if were told about the peer after connecting with
70         // them. That encourages us to reconnect to peers that are well known.
71         Peers     map[peersKey]Peer
72         wantPeers sync.Cond
73
74         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
75         // mirror their respective URLs from the announce-list metainfo key.
76         Trackers []trackerTier
77         // Name used if the info name isn't available.
78         displayName string
79         // The bencoded bytes of the info dict.
80         MetaData []byte
81         // Each element corresponds to the 16KiB metadata pieces. If true, we have
82         // received that piece.
83         metadataHave []bool
84
85         // Closed when .Info is set.
86         gotMetainfo chan struct{}
87
88         readers map[*Reader]struct{}
89
90         pendingPieces   bitmap.Bitmap
91         completedPieces bitmap.Bitmap
92
93         connPieceInclinationPool sync.Pool
94 }
95
96 var (
97         pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
98         pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
99         pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
100 )
101
102 func (t *torrent) setDisplayName(dn string) {
103         t.displayName = dn
104 }
105
106 func (t *torrent) pieceComplete(piece int) bool {
107         return t.completedPieces.Get(piece)
108 }
109
110 func (t *torrent) pieceCompleteUncached(piece int) bool {
111         return t.Pieces[piece].Storage().GetIsComplete()
112 }
113
114 func (t *torrent) numConnsUnchoked() (num int) {
115         for _, c := range t.Conns {
116                 if !c.PeerChoked {
117                         num++
118                 }
119         }
120         return
121 }
122
123 // There's a connection to that address already.
124 func (t *torrent) addrActive(addr string) bool {
125         if _, ok := t.HalfOpen[addr]; ok {
126                 return true
127         }
128         for _, c := range t.Conns {
129                 if c.remoteAddr().String() == addr {
130                         return true
131                 }
132         }
133         return false
134 }
135
136 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
137         wcs = &worstConns{
138                 c:  make([]*connection, 0, len(t.Conns)),
139                 t:  t,
140                 cl: cl,
141         }
142         for _, c := range t.Conns {
143                 if !c.closed.IsSet() {
144                         wcs.c = append(wcs.c, c)
145                 }
146         }
147         return
148 }
149
150 func (t *torrent) ceaseNetworking() {
151         select {
152         case <-t.ceasingNetworking:
153                 return
154         default:
155         }
156         close(t.ceasingNetworking)
157         for _, c := range t.Conns {
158                 c.Close()
159         }
160 }
161
162 func (t *torrent) addPeer(p Peer, cl *Client) {
163         cl.openNewConns(t)
164         if len(t.Peers) >= torrentPeersHighWater {
165                 return
166         }
167         key := peersKey{string(p.IP), p.Port}
168         if _, ok := t.Peers[key]; ok {
169                 return
170         }
171         t.Peers[key] = p
172         peersAddedBySource.Add(string(p.Source), 1)
173         cl.openNewConns(t)
174
175 }
176
177 func (t *torrent) invalidateMetadata() {
178         t.MetaData = nil
179         t.metadataHave = nil
180         t.Info = nil
181 }
182
183 func (t *torrent) saveMetadataPiece(index int, data []byte) {
184         if t.haveInfo() {
185                 return
186         }
187         if index >= len(t.metadataHave) {
188                 log.Printf("%s: ignoring metadata piece %d", t, index)
189                 return
190         }
191         copy(t.MetaData[(1<<14)*index:], data)
192         t.metadataHave[index] = true
193 }
194
195 func (t *torrent) metadataPieceCount() int {
196         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
197 }
198
199 func (t *torrent) haveMetadataPiece(piece int) bool {
200         if t.haveInfo() {
201                 return (1<<14)*piece < len(t.MetaData)
202         } else {
203                 return piece < len(t.metadataHave) && t.metadataHave[piece]
204         }
205 }
206
207 func (t *torrent) metadataSizeKnown() bool {
208         return t.MetaData != nil
209 }
210
211 func (t *torrent) metadataSize() int {
212         return len(t.MetaData)
213 }
214
215 func infoPieceHashes(info *metainfo.Info) (ret []string) {
216         for i := 0; i < len(info.Pieces); i += 20 {
217                 ret = append(ret, string(info.Pieces[i:i+20]))
218         }
219         return
220 }
221
222 // Called when metadata for a torrent becomes available.
223 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
224         err = validateInfo(md)
225         if err != nil {
226                 err = fmt.Errorf("bad info: %s", err)
227                 return
228         }
229         t.Info = &metainfo.InfoEx{
230                 Info:  *md,
231                 Bytes: infoBytes,
232                 Hash:  &t.InfoHash,
233         }
234         t.storage, err = t.storageOpener.OpenTorrent(t.Info)
235         if err != nil {
236                 return
237         }
238         t.length = 0
239         for _, f := range t.Info.UpvertedFiles() {
240                 t.length += f.Length
241         }
242         t.MetaData = infoBytes
243         t.metadataHave = nil
244         hashes := infoPieceHashes(md)
245         t.Pieces = make([]piece, len(hashes))
246         for i, hash := range hashes {
247                 piece := &t.Pieces[i]
248                 piece.t = t
249                 piece.index = i
250                 piece.noPendingWrites.L = &piece.pendingWritesMutex
251                 missinggo.CopyExact(piece.Hash[:], hash)
252         }
253         for _, conn := range t.Conns {
254                 if err := conn.setNumPieces(t.numPieces()); err != nil {
255                         log.Printf("closing connection: %s", err)
256                         conn.Close()
257                 }
258         }
259         for i := range t.Pieces {
260                 t.updatePieceCompletion(i)
261                 t.Pieces[i].QueuedForHash = true
262         }
263         go func() {
264                 for i := range t.Pieces {
265                         t.verifyPiece(i)
266                 }
267         }()
268         return
269 }
270
271 func (t *torrent) verifyPiece(piece int) {
272         t.cl.verifyPiece(t, piece)
273 }
274
275 func (t *torrent) haveAllMetadataPieces() bool {
276         if t.haveInfo() {
277                 return true
278         }
279         if t.metadataHave == nil {
280                 return false
281         }
282         for _, have := range t.metadataHave {
283                 if !have {
284                         return false
285                 }
286         }
287         return true
288 }
289
290 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
291         if t.haveInfo() {
292                 // We already know the correct metadata size.
293                 return
294         }
295         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
296                 log.Printf("received bad metadata size: %d", bytes)
297                 return
298         }
299         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
300                 return
301         }
302         t.MetaData = make([]byte, bytes)
303         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
304         for _, c := range t.Conns {
305                 cl.requestPendingMetadata(t, c)
306         }
307
308 }
309
310 // The current working name for the torrent. Either the name in the info dict,
311 // or a display name given such as by the dn value in a magnet link, or "".
312 func (t *torrent) Name() string {
313         if t.haveInfo() {
314                 return t.Info.Name
315         }
316         return t.displayName
317 }
318
319 func (t *torrent) pieceState(index int) (ret PieceState) {
320         p := &t.Pieces[index]
321         ret.Priority = t.piecePriority(index)
322         if t.pieceComplete(index) {
323                 ret.Complete = true
324         }
325         if p.QueuedForHash || p.Hashing {
326                 ret.Checking = true
327         }
328         if !ret.Complete && t.piecePartiallyDownloaded(index) {
329                 ret.Partial = true
330         }
331         return
332 }
333
334 func (t *torrent) metadataPieceSize(piece int) int {
335         return metadataPieceSize(len(t.MetaData), piece)
336 }
337
338 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
339         d := map[string]int{
340                 "msg_type": msgType,
341                 "piece":    piece,
342         }
343         if data != nil {
344                 d["total_size"] = len(t.MetaData)
345         }
346         p, err := bencode.Marshal(d)
347         if err != nil {
348                 panic(err)
349         }
350         return pp.Message{
351                 Type:            pp.Extended,
352                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
353                 ExtendedPayload: append(p, data...),
354         }
355 }
356
357 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
358         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
359                 ret = append(ret, PieceStateRun{
360                         PieceState: el.(PieceState),
361                         Length:     int(count),
362                 })
363         })
364         for index := range t.Pieces {
365                 rle.Append(t.pieceState(index), 1)
366         }
367         rle.Flush()
368         return
369 }
370
371 // Produces a small string representing a PieceStateRun.
372 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
373         ret = fmt.Sprintf("%d", psr.Length)
374         ret += func() string {
375                 switch psr.Priority {
376                 case PiecePriorityNext:
377                         return "N"
378                 case PiecePriorityNormal:
379                         return "."
380                 case PiecePriorityReadahead:
381                         return "R"
382                 case PiecePriorityNow:
383                         return "!"
384                 default:
385                         return ""
386                 }
387         }()
388         if psr.Checking {
389                 ret += "H"
390         }
391         if psr.Partial {
392                 ret += "P"
393         }
394         if psr.Complete {
395                 ret += "C"
396         }
397         return
398 }
399
400 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
401         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
402         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
403         if !t.haveInfo() {
404                 fmt.Fprintf(w, "Metadata have: ")
405                 for _, h := range t.metadataHave {
406                         fmt.Fprintf(w, "%c", func() rune {
407                                 if h {
408                                         return 'H'
409                                 } else {
410                                         return '.'
411                                 }
412                         }())
413                 }
414                 fmt.Fprintln(w)
415         }
416         fmt.Fprintf(w, "Piece length: %s\n", func() string {
417                 if t.haveInfo() {
418                         return fmt.Sprint(t.usualPieceSize())
419                 } else {
420                         return "?"
421                 }
422         }())
423         if t.haveInfo() {
424                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
425                 fmt.Fprint(w, "Piece States:")
426                 for _, psr := range t.pieceStateRuns() {
427                         w.Write([]byte(" "))
428                         w.Write([]byte(pieceStateRunStatusChars(psr)))
429                 }
430                 fmt.Fprintln(w)
431         }
432         fmt.Fprintf(w, "Reader Pieces:")
433         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
434                 fmt.Fprintf(w, " %d:%d", begin, end)
435                 return true
436         })
437         fmt.Fprintln(w)
438         fmt.Fprintf(w, "Trackers: ")
439         for _, tier := range t.Trackers {
440                 for _, tr := range tier {
441                         fmt.Fprintf(w, "%q ", tr)
442                 }
443         }
444         fmt.Fprintf(w, "\n")
445         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
446         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
447         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
448         sort.Sort(&worstConns{
449                 c:  t.Conns,
450                 t:  t,
451                 cl: cl,
452         })
453         for i, c := range t.Conns {
454                 fmt.Fprintf(w, "%2d. ", i+1)
455                 c.WriteStatus(w, t)
456         }
457 }
458
459 func (t *torrent) String() string {
460         s := t.Name()
461         if s == "" {
462                 s = fmt.Sprintf("%x", t.InfoHash)
463         }
464         return s
465 }
466
467 func (t *torrent) haveInfo() bool {
468         return t.Info != nil
469 }
470
471 // TODO: Include URIs that weren't converted to tracker clients.
472 func (t *torrent) announceList() (al [][]string) {
473         missinggo.CastSlice(&al, t.Trackers)
474         return
475 }
476
477 // Returns a run-time generated MetaInfo that includes the info bytes and
478 // announce-list as currently known to the client.
479 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
480         if t.MetaData == nil {
481                 panic("info bytes not set")
482         }
483         return &metainfo.MetaInfo{
484                 Info:         *t.Info,
485                 CreationDate: time.Now().Unix(),
486                 Comment:      "dynamic metainfo from client",
487                 CreatedBy:    "go.torrent",
488                 AnnounceList: t.announceList(),
489         }
490 }
491
492 func (t *torrent) bytesLeft() (left int64) {
493         for i := 0; i < t.numPieces(); i++ {
494                 left += int64(t.Pieces[i].bytesLeft())
495         }
496         return
497 }
498
499 // Bytes left to give in tracker announces.
500 func (t *torrent) bytesLeftAnnounce() uint64 {
501         if t.haveInfo() {
502                 return uint64(t.bytesLeft())
503         } else {
504                 return math.MaxUint64
505         }
506 }
507
508 func (t *torrent) piecePartiallyDownloaded(piece int) bool {
509         if t.pieceComplete(piece) {
510                 return false
511         }
512         if t.pieceAllDirty(piece) {
513                 return false
514         }
515         return t.Pieces[piece].hasDirtyChunks()
516 }
517
518 func numChunksForPiece(chunkSize int, pieceSize int) int {
519         return (pieceSize + chunkSize - 1) / chunkSize
520 }
521
522 func (t *torrent) usualPieceSize() int {
523         return int(t.Info.PieceLength)
524 }
525
526 func (t *torrent) lastPieceSize() int {
527         return int(t.pieceLength(t.numPieces() - 1))
528 }
529
530 func (t *torrent) numPieces() int {
531         return t.Info.NumPieces()
532 }
533
534 func (t *torrent) numPiecesCompleted() (num int) {
535         return t.completedPieces.Len()
536 }
537
538 // Safe to call with or without client lock.
539 func (t *torrent) isClosed() bool {
540         select {
541         case <-t.closing:
542                 return true
543         default:
544                 return false
545         }
546 }
547
548 func (t *torrent) close() (err error) {
549         if t.isClosed() {
550                 return
551         }
552         t.ceaseNetworking()
553         close(t.closing)
554         if c, ok := t.storage.(io.Closer); ok {
555                 c.Close()
556         }
557         for _, conn := range t.Conns {
558                 conn.Close()
559         }
560         t.pieceStateChanges.Close()
561         return
562 }
563
564 func (t *torrent) requestOffset(r request) int64 {
565         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
566 }
567
568 // Return the request that would include the given offset into the torrent
569 // data. Returns !ok if there is no such request.
570 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
571         return torrentOffsetRequest(t.length, t.Info.PieceLength, int64(t.chunkSize), off)
572 }
573
574 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
575         tr := perf.NewTimer()
576
577         n, err := t.Pieces[piece].Storage().WriteAt(data, begin)
578         if err == nil && n != len(data) {
579                 err = io.ErrShortWrite
580         }
581         if err == nil {
582                 tr.Stop("write chunk")
583         }
584         return
585 }
586
587 func (t *torrent) bitfield() (bf []bool) {
588         bf = make([]bool, t.numPieces())
589         t.completedPieces.IterTyped(func(piece int) (again bool) {
590                 bf[piece] = true
591                 return true
592         })
593         return
594 }
595
596 func (t *torrent) validOutgoingRequest(r request) bool {
597         if r.Index >= pp.Integer(t.Info.NumPieces()) {
598                 return false
599         }
600         if r.Begin%t.chunkSize != 0 {
601                 return false
602         }
603         if r.Length > t.chunkSize {
604                 return false
605         }
606         pieceLength := t.pieceLength(int(r.Index))
607         if r.Begin+r.Length > pieceLength {
608                 return false
609         }
610         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
611 }
612
613 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
614         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
615         var cs chunkSpec
616         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
617                 cs.Length = left
618                 if cs.Length > t.chunkSize {
619                         cs.Length = t.chunkSize
620                 }
621                 css = append(css, cs)
622                 cs.Begin += cs.Length
623         }
624         return
625 }
626
627 func (t *torrent) pieceNumChunks(piece int) int {
628         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
629 }
630
631 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
632         t.Pieces[pieceIndex].DirtyChunks.Clear()
633 }
634
635 type Peer struct {
636         Id     [20]byte
637         IP     net.IP
638         Port   int
639         Source peerSource
640         // Peer is known to support encryption.
641         SupportsEncryption bool
642 }
643
644 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
645         if piece < 0 || piece >= t.Info.NumPieces() {
646                 return
647         }
648         if int(piece) == t.numPieces()-1 {
649                 len_ = pp.Integer(t.length % t.Info.PieceLength)
650         }
651         if len_ == 0 {
652                 len_ = pp.Integer(t.Info.PieceLength)
653         }
654         return
655 }
656
657 func (t *torrent) hashPiece(piece int) (ret pieceSum) {
658         hash := pieceHash.New()
659         p := &t.Pieces[piece]
660         p.waitNoPendingWrites()
661         ip := t.Info.Piece(piece)
662         pl := ip.Length()
663         n, err := io.Copy(hash, io.NewSectionReader(t.Pieces[piece].Storage(), 0, pl))
664         if n == pl {
665                 missinggo.CopyExact(&ret, hash.Sum(nil))
666                 return
667         }
668         if err != io.ErrUnexpectedEOF {
669                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
670         }
671         return
672 }
673
674 func (t *torrent) haveAllPieces() bool {
675         if !t.haveInfo() {
676                 return false
677         }
678         return t.completedPieces.Len() == t.numPieces()
679 }
680
681 func (me *torrent) haveAnyPieces() bool {
682         for i := range me.Pieces {
683                 if me.pieceComplete(i) {
684                         return true
685                 }
686         }
687         return false
688 }
689
690 func (t *torrent) havePiece(index int) bool {
691         return t.haveInfo() && t.pieceComplete(index)
692 }
693
694 func (t *torrent) haveChunk(r request) (ret bool) {
695         // defer func() {
696         //      log.Println("have chunk", r, ret)
697         // }()
698         if !t.haveInfo() {
699                 return false
700         }
701         if t.pieceComplete(int(r.Index)) {
702                 return true
703         }
704         p := &t.Pieces[r.Index]
705         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
706 }
707
708 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
709         return int(cs.Begin / chunkSize)
710 }
711
712 // TODO: This should probably be called wantPiece.
713 func (t *torrent) wantChunk(r request) bool {
714         if !t.wantPiece(int(r.Index)) {
715                 return false
716         }
717         if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
718                 return true
719         }
720         // TODO: What about pieces that were wanted, but aren't now, and aren't
721         // completed either? That used to be done here.
722         return false
723 }
724
725 // TODO: This should be called wantPieceIndex.
726 func (t *torrent) wantPiece(index int) bool {
727         if !t.haveInfo() {
728                 return false
729         }
730         p := &t.Pieces[index]
731         if p.QueuedForHash {
732                 return false
733         }
734         if p.Hashing {
735                 return false
736         }
737         if t.pieceComplete(index) {
738                 return false
739         }
740         if t.pendingPieces.Contains(index) {
741                 return true
742         }
743         return !t.forReaderOffsetPieces(func(begin, end int) bool {
744                 return index < begin || index >= end
745         })
746 }
747
748 func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
749         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
750                 for i := begin; begin < end; i++ {
751                         if !f(i) {
752                                 return false
753                         }
754                 }
755                 return true
756         })
757 }
758
759 func (t *torrent) connHasWantedPieces(c *connection) bool {
760         return !c.pieceRequestOrder.IsEmpty()
761 }
762
763 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
764         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
765                 pieces = append(pieces, int(i))
766         }
767         return
768 }
769
770 func (t *torrent) worstBadConn(cl *Client) *connection {
771         wcs := t.worstConns(cl)
772         heap.Init(wcs)
773         for wcs.Len() != 0 {
774                 c := heap.Pop(wcs).(*connection)
775                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
776                         return c
777                 }
778                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
779                         // Give connections 1 minute to prove themselves.
780                         if time.Since(c.completedHandshake) > time.Minute {
781                                 return c
782                         }
783                 }
784         }
785         return nil
786 }
787
788 type PieceStateChange struct {
789         Index int
790         PieceState
791 }
792
793 func (t *torrent) publishPieceChange(piece int) {
794         cur := t.pieceState(piece)
795         p := &t.Pieces[piece]
796         if cur != p.PublicPieceState {
797                 p.PublicPieceState = cur
798                 t.pieceStateChanges.Publish(PieceStateChange{
799                         piece,
800                         cur,
801                 })
802         }
803 }
804
805 func (t *torrent) pieceNumPendingChunks(piece int) int {
806         if t.pieceComplete(piece) {
807                 return 0
808         }
809         return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
810 }
811
812 func (t *torrent) pieceAllDirty(piece int) bool {
813         return t.Pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
814 }
815
816 func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
817         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
818                 if begin < end {
819                         if !f(begin) {
820                                 return false
821                         }
822                 }
823                 return true
824         })
825 }
826
827 func (t *torrent) readersChanged() {
828         t.updatePiecePriorities()
829 }
830
831 func (t *torrent) maybeNewConns() {
832         // Tickle the accept routine.
833         t.cl.event.Broadcast()
834         t.openNewConns()
835 }
836
837 func (t *torrent) piecePriorityChanged(piece int) {
838         for _, c := range t.Conns {
839                 c.updatePiecePriority(piece)
840         }
841         t.maybeNewConns()
842         t.publishPieceChange(piece)
843 }
844
845 func (t *torrent) updatePiecePriority(piece int) bool {
846         p := &t.Pieces[piece]
847         newPrio := t.piecePriorityUncached(piece)
848         if newPrio == p.priority {
849                 return false
850         }
851         p.priority = newPrio
852         return true
853 }
854
855 // Update all piece priorities in one hit. This function should have the same
856 // output as updatePiecePriority, but across all pieces.
857 func (t *torrent) updatePiecePriorities() {
858         newPrios := make([]piecePriority, t.numPieces())
859         t.pendingPieces.IterTyped(func(piece int) (more bool) {
860                 newPrios[piece] = PiecePriorityNormal
861                 return true
862         })
863         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
864                 if begin < end {
865                         newPrios[begin].Raise(PiecePriorityNow)
866                 }
867                 for i := begin + 1; i < end; i++ {
868                         newPrios[i].Raise(PiecePriorityReadahead)
869                 }
870                 return true
871         })
872         t.completedPieces.IterTyped(func(piece int) (more bool) {
873                 newPrios[piece] = PiecePriorityNone
874                 return true
875         })
876         for i, prio := range newPrios {
877                 if prio != t.Pieces[i].priority {
878                         t.Pieces[i].priority = prio
879                         t.piecePriorityChanged(i)
880                 }
881         }
882 }
883
884 func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
885         if off >= t.length {
886                 return
887         }
888         if off < 0 {
889                 size += off
890                 off = 0
891         }
892         if size <= 0 {
893                 return
894         }
895         begin = int(off / t.Info.PieceLength)
896         end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
897         if end > t.Info.NumPieces() {
898                 end = t.Info.NumPieces()
899         }
900         return
901 }
902
903 // Returns true if all iterations complete without breaking.
904 func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
905         // There's an oppurtunity here to build a map of beginning pieces, and a
906         // bitmap of the rest. I wonder if it's worth the allocation overhead.
907         for r := range t.readers {
908                 r.mu.Lock()
909                 pos, readahead := r.pos, r.readahead
910                 r.mu.Unlock()
911                 if readahead < 1 {
912                         readahead = 1
913                 }
914                 begin, end := t.byteRegionPieces(pos, readahead)
915                 if begin >= end {
916                         continue
917                 }
918                 if !f(begin, end) {
919                         return false
920                 }
921         }
922         return true
923 }
924
925 func (t *torrent) piecePriority(piece int) piecePriority {
926         if !t.haveInfo() {
927                 return PiecePriorityNone
928         }
929         return t.Pieces[piece].priority
930 }
931
932 func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
933         ret = PiecePriorityNone
934         if t.pieceComplete(piece) {
935                 return
936         }
937         if t.pendingPieces.Contains(piece) {
938                 ret = PiecePriorityNormal
939         }
940         raiseRet := ret.Raise
941         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
942                 if piece == begin {
943                         raiseRet(PiecePriorityNow)
944                 }
945                 if begin <= piece && piece < end {
946                         raiseRet(PiecePriorityReadahead)
947                 }
948                 return true
949         })
950         return
951 }
952
953 func (t *torrent) pendPiece(piece int) {
954         if t.pendingPieces.Contains(piece) {
955                 return
956         }
957         if t.havePiece(piece) {
958                 return
959         }
960         t.pendingPieces.Add(piece)
961         if !t.updatePiecePriority(piece) {
962                 return
963         }
964         t.piecePriorityChanged(piece)
965 }
966
967 func (t *torrent) getCompletedPieces() (ret bitmap.Bitmap) {
968         return t.completedPieces.Copy()
969 }
970
971 func (t *torrent) unpendPieces(unpend *bitmap.Bitmap) {
972         t.pendingPieces.Sub(unpend)
973         t.updatePiecePriorities()
974 }
975
976 func (t *torrent) pendPieceRange(begin, end int) {
977         for i := begin; i < end; i++ {
978                 t.pendPiece(i)
979         }
980 }
981
982 func (t *torrent) unpendPieceRange(begin, end int) {
983         var bm bitmap.Bitmap
984         bm.AddRange(begin, end)
985         t.unpendPieces(&bm)
986 }
987
988 func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
989         if !c.PeerHasPiece(piece) {
990                 return true
991         }
992         chunkIndices := t.Pieces[piece].undirtiedChunkIndices().ToSortedSlice()
993         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
994                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
995                 return c.Request(req)
996         })
997 }
998
999 func (t *torrent) pendRequest(req request) {
1000         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1001         t.Pieces[req.Index].pendChunkIndex(ci)
1002 }
1003
1004 func (t *torrent) pieceChanged(piece int) {
1005         t.cl.pieceChanged(t, piece)
1006 }
1007
1008 func (t *torrent) openNewConns() {
1009         t.cl.openNewConns(t)
1010 }
1011
1012 func (t *torrent) getConnPieceInclination() []int {
1013         _ret := t.connPieceInclinationPool.Get()
1014         if _ret == nil {
1015                 pieceInclinationsNew.Add(1)
1016                 return rand.Perm(t.numPieces())
1017         }
1018         pieceInclinationsReused.Add(1)
1019         return _ret.([]int)
1020 }
1021
1022 func (t *torrent) putPieceInclination(pi []int) {
1023         t.connPieceInclinationPool.Put(pi)
1024         pieceInclinationsPut.Add(1)
1025 }
1026
1027 func (t *torrent) updatePieceCompletion(piece int) {
1028         t.completedPieces.Set(piece, t.pieceCompleteUncached(piece))
1029 }
1030
1031 // Non-blocking read. Client lock is not required.
1032 func (t *torrent) readAt(b []byte, off int64) (n int, err error) {
1033         p := &t.Pieces[off/t.Info.PieceLength]
1034         p.waitNoPendingWrites()
1035         return p.Storage().ReadAt(b, off-p.Info().Offset())
1036 }
1037
1038 func (t *torrent) updateAllPieceCompletions() {
1039         for i := range iter.N(t.numPieces()) {
1040                 t.updatePieceCompletion(i)
1041         }
1042 }