]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Remove deadcode
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math"
10         "math/rand"
11         "net"
12         "sort"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/bitmap"
18         "github.com/anacrolix/missinggo/itertools"
19         "github.com/anacrolix/missinggo/perf"
20         "github.com/anacrolix/missinggo/pubsub"
21         "github.com/bradfitz/iter"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/metainfo"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26         "github.com/anacrolix/torrent/storage"
27 )
28
29 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
30         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
31 }
32
33 type peersKey struct {
34         IPBytes string
35         Port    int
36 }
37
38 // Maintains state of torrent within a Client.
39 type Torrent struct {
40         cl *Client
41
42         closing chan struct{}
43
44         // Closed when no more network activity is desired. This includes
45         // announcing, and communicating with peers.
46         ceasingNetworking chan struct{}
47
48         infoHash metainfo.Hash
49         pieces   []piece
50         // Values are the piece indices that changed.
51         pieceStateChanges *pubsub.PubSub
52         chunkSize         pp.Integer
53         // Total length of the torrent in bytes. Stored because it's not O(1) to
54         // get this from the info dict.
55         length int64
56
57         // The storage to open when the info dict becomes available.
58         storageOpener storage.I
59         // Storage for torrent data.
60         storage storage.Torrent
61
62         // The info dict. nil if we don't have it (yet).
63         info *metainfo.InfoEx
64         // Active peer connections, running message stream loops.
65         conns []*connection
66         // Set of addrs to which we're attempting to connect. Connections are
67         // half-open until all handshakes are completed.
68         halfOpen map[string]struct{}
69
70         // Reserve of peers to connect to. A peer can be both here and in the
71         // active connections if were told about the peer after connecting with
72         // them. That encourages us to reconnect to peers that are well known.
73         peers     map[peersKey]Peer
74         wantPeers sync.Cond
75
76         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
77         // mirror their respective URLs from the announce-list metainfo key.
78         trackers []trackerTier
79         // Name used if the info name isn't available.
80         displayName string
81         // The bencoded bytes of the info dict.
82         metadataBytes []byte
83         // Each element corresponds to the 16KiB metadata pieces. If true, we have
84         // received that piece.
85         metadataCompletedChunks []bool
86
87         // Closed when .Info is set.
88         gotMetainfo chan struct{}
89
90         readers map[*Reader]struct{}
91
92         pendingPieces   bitmap.Bitmap
93         completedPieces bitmap.Bitmap
94
95         connPieceInclinationPool sync.Pool
96 }
97
98 var (
99         pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
100         pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
101         pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
102 )
103
104 func (t *Torrent) setDisplayName(dn string) {
105         t.displayName = dn
106 }
107
108 func (t *Torrent) pieceComplete(piece int) bool {
109         return t.completedPieces.Get(piece)
110 }
111
112 func (t *Torrent) pieceCompleteUncached(piece int) bool {
113         return t.pieces[piece].Storage().GetIsComplete()
114 }
115
116 func (t *Torrent) numConnsUnchoked() (num int) {
117         for _, c := range t.conns {
118                 if !c.PeerChoked {
119                         num++
120                 }
121         }
122         return
123 }
124
125 // There's a connection to that address already.
126 func (t *Torrent) addrActive(addr string) bool {
127         if _, ok := t.halfOpen[addr]; ok {
128                 return true
129         }
130         for _, c := range t.conns {
131                 if c.remoteAddr().String() == addr {
132                         return true
133                 }
134         }
135         return false
136 }
137
138 func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
139         wcs = &worstConns{
140                 c:  make([]*connection, 0, len(t.conns)),
141                 t:  t,
142                 cl: cl,
143         }
144         for _, c := range t.conns {
145                 if !c.closed.IsSet() {
146                         wcs.c = append(wcs.c, c)
147                 }
148         }
149         return
150 }
151
152 func (t *Torrent) ceaseNetworking() {
153         select {
154         case <-t.ceasingNetworking:
155                 return
156         default:
157         }
158         close(t.ceasingNetworking)
159         for _, c := range t.conns {
160                 c.Close()
161         }
162 }
163
164 func (t *Torrent) addPeer(p Peer, cl *Client) {
165         cl.openNewConns(t)
166         if len(t.peers) >= torrentPeersHighWater {
167                 return
168         }
169         key := peersKey{string(p.IP), p.Port}
170         if _, ok := t.peers[key]; ok {
171                 return
172         }
173         t.peers[key] = p
174         peersAddedBySource.Add(string(p.Source), 1)
175         cl.openNewConns(t)
176
177 }
178
179 func (t *Torrent) invalidateMetadata() {
180         t.metadataBytes = nil
181         t.metadataCompletedChunks = nil
182         t.info = nil
183 }
184
185 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
186         if t.haveInfo() {
187                 return
188         }
189         if index >= len(t.metadataCompletedChunks) {
190                 log.Printf("%s: ignoring metadata piece %d", t, index)
191                 return
192         }
193         copy(t.metadataBytes[(1<<14)*index:], data)
194         t.metadataCompletedChunks[index] = true
195 }
196
197 func (t *Torrent) metadataPieceCount() int {
198         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
199 }
200
201 func (t *Torrent) haveMetadataPiece(piece int) bool {
202         if t.haveInfo() {
203                 return (1<<14)*piece < len(t.metadataBytes)
204         } else {
205                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
206         }
207 }
208
209 func (t *Torrent) metadataSizeKnown() bool {
210         return t.metadataBytes != nil
211 }
212
213 func (t *Torrent) metadataSize() int {
214         return len(t.metadataBytes)
215 }
216
217 func infoPieceHashes(info *metainfo.Info) (ret []string) {
218         for i := 0; i < len(info.Pieces); i += 20 {
219                 ret = append(ret, string(info.Pieces[i:i+20]))
220         }
221         return
222 }
223
224 // Called when metadata for a torrent becomes available.
225 func (t *Torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
226         err = validateInfo(md)
227         if err != nil {
228                 err = fmt.Errorf("bad info: %s", err)
229                 return
230         }
231         t.info = &metainfo.InfoEx{
232                 Info:  *md,
233                 Bytes: infoBytes,
234                 Hash:  &t.infoHash,
235         }
236         t.storage, err = t.storageOpener.OpenTorrent(t.info)
237         if err != nil {
238                 return
239         }
240         t.length = 0
241         for _, f := range t.info.UpvertedFiles() {
242                 t.length += f.Length
243         }
244         t.metadataBytes = infoBytes
245         t.metadataCompletedChunks = nil
246         hashes := infoPieceHashes(md)
247         t.pieces = make([]piece, len(hashes))
248         for i, hash := range hashes {
249                 piece := &t.pieces[i]
250                 piece.t = t
251                 piece.index = i
252                 piece.noPendingWrites.L = &piece.pendingWritesMutex
253                 missinggo.CopyExact(piece.Hash[:], hash)
254         }
255         for _, conn := range t.conns {
256                 if err := conn.setNumPieces(t.numPieces()); err != nil {
257                         log.Printf("closing connection: %s", err)
258                         conn.Close()
259                 }
260         }
261         for i := range t.pieces {
262                 t.updatePieceCompletion(i)
263                 t.pieces[i].QueuedForHash = true
264         }
265         go func() {
266                 for i := range t.pieces {
267                         t.verifyPiece(i)
268                 }
269         }()
270         return
271 }
272
273 func (t *Torrent) verifyPiece(piece int) {
274         t.cl.verifyPiece(t, piece)
275 }
276
277 func (t *Torrent) haveAllMetadataPieces() bool {
278         if t.haveInfo() {
279                 return true
280         }
281         if t.metadataCompletedChunks == nil {
282                 return false
283         }
284         for _, have := range t.metadataCompletedChunks {
285                 if !have {
286                         return false
287                 }
288         }
289         return true
290 }
291
292 func (t *Torrent) setMetadataSize(bytes int64, cl *Client) {
293         if t.haveInfo() {
294                 // We already know the correct metadata size.
295                 return
296         }
297         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
298                 log.Printf("received bad metadata size: %d", bytes)
299                 return
300         }
301         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
302                 return
303         }
304         t.metadataBytes = make([]byte, bytes)
305         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
306         for _, c := range t.conns {
307                 cl.requestPendingMetadata(t, c)
308         }
309
310 }
311
312 // The current working name for the torrent. Either the name in the info dict,
313 // or a display name given such as by the dn value in a magnet link, or "".
314 func (t *Torrent) name() string {
315         if t.haveInfo() {
316                 return t.info.Name
317         }
318         return t.displayName
319 }
320
321 func (t *Torrent) pieceState(index int) (ret PieceState) {
322         p := &t.pieces[index]
323         ret.Priority = t.piecePriority(index)
324         if t.pieceComplete(index) {
325                 ret.Complete = true
326         }
327         if p.QueuedForHash || p.Hashing {
328                 ret.Checking = true
329         }
330         if !ret.Complete && t.piecePartiallyDownloaded(index) {
331                 ret.Partial = true
332         }
333         return
334 }
335
336 func (t *Torrent) metadataPieceSize(piece int) int {
337         return metadataPieceSize(len(t.metadataBytes), piece)
338 }
339
340 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
341         d := map[string]int{
342                 "msg_type": msgType,
343                 "piece":    piece,
344         }
345         if data != nil {
346                 d["total_size"] = len(t.metadataBytes)
347         }
348         p, err := bencode.Marshal(d)
349         if err != nil {
350                 panic(err)
351         }
352         return pp.Message{
353                 Type:            pp.Extended,
354                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
355                 ExtendedPayload: append(p, data...),
356         }
357 }
358
359 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
360         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
361                 ret = append(ret, PieceStateRun{
362                         PieceState: el.(PieceState),
363                         Length:     int(count),
364                 })
365         })
366         for index := range t.pieces {
367                 rle.Append(t.pieceState(index), 1)
368         }
369         rle.Flush()
370         return
371 }
372
373 // Produces a small string representing a PieceStateRun.
374 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
375         ret = fmt.Sprintf("%d", psr.Length)
376         ret += func() string {
377                 switch psr.Priority {
378                 case PiecePriorityNext:
379                         return "N"
380                 case PiecePriorityNormal:
381                         return "."
382                 case PiecePriorityReadahead:
383                         return "R"
384                 case PiecePriorityNow:
385                         return "!"
386                 default:
387                         return ""
388                 }
389         }()
390         if psr.Checking {
391                 ret += "H"
392         }
393         if psr.Partial {
394                 ret += "P"
395         }
396         if psr.Complete {
397                 ret += "C"
398         }
399         return
400 }
401
402 func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
403         fmt.Fprintf(w, "Infohash: %x\n", t.infoHash)
404         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
405         if !t.haveInfo() {
406                 fmt.Fprintf(w, "Metadata have: ")
407                 for _, h := range t.metadataCompletedChunks {
408                         fmt.Fprintf(w, "%c", func() rune {
409                                 if h {
410                                         return 'H'
411                                 } else {
412                                         return '.'
413                                 }
414                         }())
415                 }
416                 fmt.Fprintln(w)
417         }
418         fmt.Fprintf(w, "Piece length: %s\n", func() string {
419                 if t.haveInfo() {
420                         return fmt.Sprint(t.usualPieceSize())
421                 } else {
422                         return "?"
423                 }
424         }())
425         if t.haveInfo() {
426                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
427                 fmt.Fprint(w, "Piece States:")
428                 for _, psr := range t.pieceStateRuns() {
429                         w.Write([]byte(" "))
430                         w.Write([]byte(pieceStateRunStatusChars(psr)))
431                 }
432                 fmt.Fprintln(w)
433         }
434         fmt.Fprintf(w, "Reader Pieces:")
435         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
436                 fmt.Fprintf(w, " %d:%d", begin, end)
437                 return true
438         })
439         fmt.Fprintln(w)
440         fmt.Fprintf(w, "Trackers: ")
441         for _, tier := range t.trackers {
442                 for _, tr := range tier {
443                         fmt.Fprintf(w, "%q ", tr)
444                 }
445         }
446         fmt.Fprintf(w, "\n")
447         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
448         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
449         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
450         sort.Sort(&worstConns{
451                 c:  t.conns,
452                 t:  t,
453                 cl: cl,
454         })
455         for i, c := range t.conns {
456                 fmt.Fprintf(w, "%2d. ", i+1)
457                 c.WriteStatus(w, t)
458         }
459 }
460
461 func (t *Torrent) String() string {
462         s := t.Name()
463         if s == "" {
464                 s = fmt.Sprintf("%x", t.infoHash)
465         }
466         return s
467 }
468
469 func (t *Torrent) haveInfo() bool {
470         return t.info != nil
471 }
472
473 // TODO: Include URIs that weren't converted to tracker clients.
474 func (t *Torrent) announceList() (al [][]string) {
475         missinggo.CastSlice(&al, t.trackers)
476         return
477 }
478
479 // Returns a run-time generated MetaInfo that includes the info bytes and
480 // announce-list as currently known to the client.
481 func (t *Torrent) metainfo() *metainfo.MetaInfo {
482         if t.metadataBytes == nil {
483                 panic("info bytes not set")
484         }
485         return &metainfo.MetaInfo{
486                 Info:         *t.info,
487                 CreationDate: time.Now().Unix(),
488                 Comment:      "dynamic metainfo from client",
489                 CreatedBy:    "go.torrent",
490                 AnnounceList: t.announceList(),
491         }
492 }
493
494 func (t *Torrent) bytesLeft() (left int64) {
495         for i := 0; i < t.numPieces(); i++ {
496                 left += int64(t.pieces[i].bytesLeft())
497         }
498         return
499 }
500
501 // Bytes left to give in tracker announces.
502 func (t *Torrent) bytesLeftAnnounce() uint64 {
503         if t.haveInfo() {
504                 return uint64(t.bytesLeft())
505         } else {
506                 return math.MaxUint64
507         }
508 }
509
510 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
511         if t.pieceComplete(piece) {
512                 return false
513         }
514         if t.pieceAllDirty(piece) {
515                 return false
516         }
517         return t.pieces[piece].hasDirtyChunks()
518 }
519
520 func (t *Torrent) usualPieceSize() int {
521         return int(t.info.PieceLength)
522 }
523
524 func (t *Torrent) lastPieceSize() int {
525         return int(t.pieceLength(t.numPieces() - 1))
526 }
527
528 func (t *Torrent) numPieces() int {
529         return t.info.NumPieces()
530 }
531
532 func (t *Torrent) numPiecesCompleted() (num int) {
533         return t.completedPieces.Len()
534 }
535
536 // Safe to call with or without client lock.
537 func (t *Torrent) isClosed() bool {
538         select {
539         case <-t.closing:
540                 return true
541         default:
542                 return false
543         }
544 }
545
546 func (t *Torrent) close() (err error) {
547         if t.isClosed() {
548                 return
549         }
550         t.ceaseNetworking()
551         close(t.closing)
552         if c, ok := t.storage.(io.Closer); ok {
553                 c.Close()
554         }
555         for _, conn := range t.conns {
556                 conn.Close()
557         }
558         t.pieceStateChanges.Close()
559         return
560 }
561
562 func (t *Torrent) requestOffset(r request) int64 {
563         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
564 }
565
566 // Return the request that would include the given offset into the torrent
567 // data. Returns !ok if there is no such request.
568 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
569         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
570 }
571
572 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
573         tr := perf.NewTimer()
574
575         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
576         if err == nil && n != len(data) {
577                 err = io.ErrShortWrite
578         }
579         if err == nil {
580                 tr.Stop("write chunk")
581         }
582         return
583 }
584
585 func (t *Torrent) bitfield() (bf []bool) {
586         bf = make([]bool, t.numPieces())
587         t.completedPieces.IterTyped(func(piece int) (again bool) {
588                 bf[piece] = true
589                 return true
590         })
591         return
592 }
593
594 func (t *Torrent) validOutgoingRequest(r request) bool {
595         if r.Index >= pp.Integer(t.info.NumPieces()) {
596                 return false
597         }
598         if r.Begin%t.chunkSize != 0 {
599                 return false
600         }
601         if r.Length > t.chunkSize {
602                 return false
603         }
604         pieceLength := t.pieceLength(int(r.Index))
605         if r.Begin+r.Length > pieceLength {
606                 return false
607         }
608         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
609 }
610
611 func (t *Torrent) pieceChunks(piece int) (css []chunkSpec) {
612         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
613         var cs chunkSpec
614         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
615                 cs.Length = left
616                 if cs.Length > t.chunkSize {
617                         cs.Length = t.chunkSize
618                 }
619                 css = append(css, cs)
620                 cs.Begin += cs.Length
621         }
622         return
623 }
624
625 func (t *Torrent) pieceNumChunks(piece int) int {
626         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
627 }
628
629 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
630         t.pieces[pieceIndex].DirtyChunks.Clear()
631 }
632
633 type Peer struct {
634         Id     [20]byte
635         IP     net.IP
636         Port   int
637         Source peerSource
638         // Peer is known to support encryption.
639         SupportsEncryption bool
640 }
641
642 func (t *Torrent) pieceLength(piece int) (len_ pp.Integer) {
643         if piece < 0 || piece >= t.info.NumPieces() {
644                 return
645         }
646         if int(piece) == t.numPieces()-1 {
647                 len_ = pp.Integer(t.length % t.info.PieceLength)
648         }
649         if len_ == 0 {
650                 len_ = pp.Integer(t.info.PieceLength)
651         }
652         return
653 }
654
655 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
656         hash := pieceHash.New()
657         p := &t.pieces[piece]
658         p.waitNoPendingWrites()
659         ip := t.info.Piece(piece)
660         pl := ip.Length()
661         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
662         if n == pl {
663                 missinggo.CopyExact(&ret, hash.Sum(nil))
664                 return
665         }
666         if err != io.ErrUnexpectedEOF {
667                 log.Printf("unexpected error hashing piece with %T: %s", t.storage, err)
668         }
669         return
670 }
671
672 func (t *Torrent) haveAllPieces() bool {
673         if !t.haveInfo() {
674                 return false
675         }
676         return t.completedPieces.Len() == t.numPieces()
677 }
678
679 func (me *Torrent) haveAnyPieces() bool {
680         for i := range me.pieces {
681                 if me.pieceComplete(i) {
682                         return true
683                 }
684         }
685         return false
686 }
687
688 func (t *Torrent) havePiece(index int) bool {
689         return t.haveInfo() && t.pieceComplete(index)
690 }
691
692 func (t *Torrent) haveChunk(r request) (ret bool) {
693         // defer func() {
694         //      log.Println("have chunk", r, ret)
695         // }()
696         if !t.haveInfo() {
697                 return false
698         }
699         if t.pieceComplete(int(r.Index)) {
700                 return true
701         }
702         p := &t.pieces[r.Index]
703         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
704 }
705
706 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
707         return int(cs.Begin / chunkSize)
708 }
709
710 // TODO: This should probably be called wantPiece.
711 func (t *Torrent) wantChunk(r request) bool {
712         if !t.wantPiece(int(r.Index)) {
713                 return false
714         }
715         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
716                 return true
717         }
718         // TODO: What about pieces that were wanted, but aren't now, and aren't
719         // completed either? That used to be done here.
720         return false
721 }
722
723 // TODO: This should be called wantPieceIndex.
724 func (t *Torrent) wantPiece(index int) bool {
725         if !t.haveInfo() {
726                 return false
727         }
728         p := &t.pieces[index]
729         if p.QueuedForHash {
730                 return false
731         }
732         if p.Hashing {
733                 return false
734         }
735         if t.pieceComplete(index) {
736                 return false
737         }
738         if t.pendingPieces.Contains(index) {
739                 return true
740         }
741         return !t.forReaderOffsetPieces(func(begin, end int) bool {
742                 return index < begin || index >= end
743         })
744 }
745
746 func (t *Torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
747         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
748                 for i := begin; begin < end; i++ {
749                         if !f(i) {
750                                 return false
751                         }
752                 }
753                 return true
754         })
755 }
756
757 func (t *Torrent) connHasWantedPieces(c *connection) bool {
758         return !c.pieceRequestOrder.IsEmpty()
759 }
760
761 func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
762         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
763                 pieces = append(pieces, int(i))
764         }
765         return
766 }
767
768 func (t *Torrent) worstBadConn(cl *Client) *connection {
769         wcs := t.worstConns(cl)
770         heap.Init(wcs)
771         for wcs.Len() != 0 {
772                 c := heap.Pop(wcs).(*connection)
773                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
774                         return c
775                 }
776                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
777                         // Give connections 1 minute to prove themselves.
778                         if time.Since(c.completedHandshake) > time.Minute {
779                                 return c
780                         }
781                 }
782         }
783         return nil
784 }
785
786 type PieceStateChange struct {
787         Index int
788         PieceState
789 }
790
791 func (t *Torrent) publishPieceChange(piece int) {
792         cur := t.pieceState(piece)
793         p := &t.pieces[piece]
794         if cur != p.PublicPieceState {
795                 p.PublicPieceState = cur
796                 t.pieceStateChanges.Publish(PieceStateChange{
797                         piece,
798                         cur,
799                 })
800         }
801 }
802
803 func (t *Torrent) pieceNumPendingChunks(piece int) int {
804         if t.pieceComplete(piece) {
805                 return 0
806         }
807         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
808 }
809
810 func (t *Torrent) pieceAllDirty(piece int) bool {
811         return t.pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
812 }
813
814 func (t *Torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
815         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
816                 if begin < end {
817                         if !f(begin) {
818                                 return false
819                         }
820                 }
821                 return true
822         })
823 }
824
825 func (t *Torrent) readersChanged() {
826         t.updatePiecePriorities()
827 }
828
829 func (t *Torrent) maybeNewConns() {
830         // Tickle the accept routine.
831         t.cl.event.Broadcast()
832         t.openNewConns()
833 }
834
835 func (t *Torrent) piecePriorityChanged(piece int) {
836         for _, c := range t.conns {
837                 c.updatePiecePriority(piece)
838         }
839         t.maybeNewConns()
840         t.publishPieceChange(piece)
841 }
842
843 func (t *Torrent) updatePiecePriority(piece int) bool {
844         p := &t.pieces[piece]
845         newPrio := t.piecePriorityUncached(piece)
846         if newPrio == p.priority {
847                 return false
848         }
849         p.priority = newPrio
850         return true
851 }
852
853 // Update all piece priorities in one hit. This function should have the same
854 // output as updatePiecePriority, but across all pieces.
855 func (t *Torrent) updatePiecePriorities() {
856         newPrios := make([]piecePriority, t.numPieces())
857         t.pendingPieces.IterTyped(func(piece int) (more bool) {
858                 newPrios[piece] = PiecePriorityNormal
859                 return true
860         })
861         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
862                 if begin < end {
863                         newPrios[begin].Raise(PiecePriorityNow)
864                 }
865                 for i := begin + 1; i < end; i++ {
866                         newPrios[i].Raise(PiecePriorityReadahead)
867                 }
868                 return true
869         })
870         t.completedPieces.IterTyped(func(piece int) (more bool) {
871                 newPrios[piece] = PiecePriorityNone
872                 return true
873         })
874         for i, prio := range newPrios {
875                 if prio != t.pieces[i].priority {
876                         t.pieces[i].priority = prio
877                         t.piecePriorityChanged(i)
878                 }
879         }
880 }
881
882 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
883         if off >= t.length {
884                 return
885         }
886         if off < 0 {
887                 size += off
888                 off = 0
889         }
890         if size <= 0 {
891                 return
892         }
893         begin = int(off / t.info.PieceLength)
894         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
895         if end > t.info.NumPieces() {
896                 end = t.info.NumPieces()
897         }
898         return
899 }
900
901 // Returns true if all iterations complete without breaking.
902 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
903         // There's an oppurtunity here to build a map of beginning pieces, and a
904         // bitmap of the rest. I wonder if it's worth the allocation overhead.
905         for r := range t.readers {
906                 r.mu.Lock()
907                 pos, readahead := r.pos, r.readahead
908                 r.mu.Unlock()
909                 if readahead < 1 {
910                         readahead = 1
911                 }
912                 begin, end := t.byteRegionPieces(pos, readahead)
913                 if begin >= end {
914                         continue
915                 }
916                 if !f(begin, end) {
917                         return false
918                 }
919         }
920         return true
921 }
922
923 func (t *Torrent) piecePriority(piece int) piecePriority {
924         if !t.haveInfo() {
925                 return PiecePriorityNone
926         }
927         return t.pieces[piece].priority
928 }
929
930 func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
931         ret = PiecePriorityNone
932         if t.pieceComplete(piece) {
933                 return
934         }
935         if t.pendingPieces.Contains(piece) {
936                 ret = PiecePriorityNormal
937         }
938         raiseRet := ret.Raise
939         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
940                 if piece == begin {
941                         raiseRet(PiecePriorityNow)
942                 }
943                 if begin <= piece && piece < end {
944                         raiseRet(PiecePriorityReadahead)
945                 }
946                 return true
947         })
948         return
949 }
950
951 func (t *Torrent) pendPiece(piece int) {
952         if t.pendingPieces.Contains(piece) {
953                 return
954         }
955         if t.havePiece(piece) {
956                 return
957         }
958         t.pendingPieces.Add(piece)
959         if !t.updatePiecePriority(piece) {
960                 return
961         }
962         t.piecePriorityChanged(piece)
963 }
964
965 func (t *Torrent) getCompletedPieces() (ret bitmap.Bitmap) {
966         return t.completedPieces.Copy()
967 }
968
969 func (t *Torrent) unpendPieces(unpend *bitmap.Bitmap) {
970         t.pendingPieces.Sub(unpend)
971         t.updatePiecePriorities()
972 }
973
974 func (t *Torrent) pendPieceRange(begin, end int) {
975         for i := begin; i < end; i++ {
976                 t.pendPiece(i)
977         }
978 }
979
980 func (t *Torrent) unpendPieceRange(begin, end int) {
981         var bm bitmap.Bitmap
982         bm.AddRange(begin, end)
983         t.unpendPieces(&bm)
984 }
985
986 func (t *Torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
987         if !c.PeerHasPiece(piece) {
988                 return true
989         }
990         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
991         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
992                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
993                 return c.Request(req)
994         })
995 }
996
997 func (t *Torrent) pendRequest(req request) {
998         ci := chunkIndex(req.chunkSpec, t.chunkSize)
999         t.pieces[req.Index].pendChunkIndex(ci)
1000 }
1001
1002 func (t *Torrent) pieceChanged(piece int) {
1003         t.cl.pieceChanged(t, piece)
1004 }
1005
1006 func (t *Torrent) openNewConns() {
1007         t.cl.openNewConns(t)
1008 }
1009
1010 func (t *Torrent) getConnPieceInclination() []int {
1011         _ret := t.connPieceInclinationPool.Get()
1012         if _ret == nil {
1013                 pieceInclinationsNew.Add(1)
1014                 return rand.Perm(t.numPieces())
1015         }
1016         pieceInclinationsReused.Add(1)
1017         return _ret.([]int)
1018 }
1019
1020 func (t *Torrent) putPieceInclination(pi []int) {
1021         t.connPieceInclinationPool.Put(pi)
1022         pieceInclinationsPut.Add(1)
1023 }
1024
1025 func (t *Torrent) updatePieceCompletion(piece int) {
1026         pcu := t.pieceCompleteUncached(piece)
1027         changed := t.completedPieces.Get(piece) != pcu
1028         t.completedPieces.Set(piece, pcu)
1029         if changed {
1030                 t.pieceChanged(piece)
1031         }
1032 }
1033
1034 // Non-blocking read. Client lock is not required.
1035 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1036         p := &t.pieces[off/t.info.PieceLength]
1037         p.waitNoPendingWrites()
1038         return p.Storage().ReadAt(b, off-p.Info().Offset())
1039 }
1040
1041 func (t *Torrent) updateAllPieceCompletions() {
1042         for i := range iter.N(t.numPieces()) {
1043                 t.updatePieceCompletion(i)
1044         }
1045 }