]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Ditch Data.WriteSectionTo, and fix cmd/torrent-verify
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math/rand"
10         "net"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/missinggo"
16         "github.com/anacrolix/missinggo/bitmap"
17         "github.com/anacrolix/missinggo/itertools"
18         "github.com/anacrolix/missinggo/perf"
19         "github.com/anacrolix/missinggo/pubsub"
20
21         "github.com/anacrolix/torrent/bencode"
22         "github.com/anacrolix/torrent/metainfo"
23         pp "github.com/anacrolix/torrent/peer_protocol"
24 )
25
26 func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
27         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
28 }
29
30 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
31         if t.pieceComplete(index) {
32                 return
33         }
34         piece := &t.Pieces[index]
35         count = t.pieceLength(index)
36         if !piece.EverHashed {
37                 return
38         }
39         regularDirty := piece.numDirtyChunks()
40         lastChunkIndex := t.pieceNumChunks(index) - 1
41         if piece.pendingChunkIndex(lastChunkIndex) {
42                 regularDirty--
43                 count -= t.chunkIndexSpec(lastChunkIndex, index).Length
44         }
45         count -= pp.Integer(regularDirty) * t.chunkSize
46         return
47 }
48
49 type peersKey struct {
50         IPBytes string
51         Port    int
52 }
53
54 // Is not aware of Client. Maintains state of torrent for with-in a Client.
55 type torrent struct {
56         cl *Client
57
58         stateMu sync.Mutex
59         closing chan struct{}
60
61         // Closed when no more network activity is desired. This includes
62         // announcing, and communicating with peers.
63         ceasingNetworking chan struct{}
64
65         InfoHash InfoHash
66         Pieces   []piece
67         // Values are the piece indices that changed.
68         pieceStateChanges *pubsub.PubSub
69         chunkSize         pp.Integer
70         // Total length of the torrent in bytes. Stored because it's not O(1) to
71         // get this from the info dict.
72         length int64
73
74         data Data
75
76         // The info dict. Nil if we don't have it (yet).
77         Info *metainfo.Info
78         // Active peer connections, running message stream loops.
79         Conns []*connection
80         // Set of addrs to which we're attempting to connect. Connections are
81         // half-open until all handshakes are completed.
82         HalfOpen map[string]struct{}
83
84         // Reserve of peers to connect to. A peer can be both here and in the
85         // active connections if were told about the peer after connecting with
86         // them. That encourages us to reconnect to peers that are well known.
87         Peers     map[peersKey]Peer
88         wantPeers sync.Cond
89
90         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
91         // mirror their respective URLs from the announce-list metainfo key.
92         Trackers []trackerTier
93         // Name used if the info name isn't available.
94         displayName string
95         // The bencoded bytes of the info dict.
96         MetaData []byte
97         // Each element corresponds to the 16KiB metadata pieces. If true, we have
98         // received that piece.
99         metadataHave []bool
100
101         // Closed when .Info is set.
102         gotMetainfo chan struct{}
103
104         readers map[*Reader]struct{}
105
106         pendingPieces   bitmap.Bitmap
107         completedPieces bitmap.Bitmap
108
109         connPieceInclinationPool sync.Pool
110 }
111
112 var (
113         pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
114         pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
115         pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
116 )
117
118 func (t *torrent) setDisplayName(dn string) {
119         t.displayName = dn
120 }
121
122 func (t *torrent) pieceComplete(piece int) bool {
123         return t.completedPieces.Get(piece)
124 }
125
126 func (t *torrent) pieceCompleteUncached(piece int) bool {
127         // TODO: This is called when setting metadata, and before storage is
128         // assigned, which doesn't seem right.
129         return t.data != nil && t.data.PieceComplete(piece)
130 }
131
132 func (t *torrent) numConnsUnchoked() (num int) {
133         for _, c := range t.Conns {
134                 if !c.PeerChoked {
135                         num++
136                 }
137         }
138         return
139 }
140
141 // There's a connection to that address already.
142 func (t *torrent) addrActive(addr string) bool {
143         if _, ok := t.HalfOpen[addr]; ok {
144                 return true
145         }
146         for _, c := range t.Conns {
147                 if c.remoteAddr().String() == addr {
148                         return true
149                 }
150         }
151         return false
152 }
153
154 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
155         wcs = &worstConns{
156                 c:  make([]*connection, 0, len(t.Conns)),
157                 t:  t,
158                 cl: cl,
159         }
160         for _, c := range t.Conns {
161                 if !c.closed.IsSet() {
162                         wcs.c = append(wcs.c, c)
163                 }
164         }
165         return
166 }
167
168 func (t *torrent) ceaseNetworking() {
169         t.stateMu.Lock()
170         defer t.stateMu.Unlock()
171         select {
172         case <-t.ceasingNetworking:
173                 return
174         default:
175         }
176         close(t.ceasingNetworking)
177         for _, c := range t.Conns {
178                 c.Close()
179         }
180 }
181
182 func (t *torrent) addPeer(p Peer, cl *Client) {
183         cl.openNewConns(t)
184         if len(t.Peers) >= torrentPeersHighWater {
185                 return
186         }
187         key := peersKey{string(p.IP), p.Port}
188         if _, ok := t.Peers[key]; ok {
189                 return
190         }
191         t.Peers[key] = p
192         peersAddedBySource.Add(string(p.Source), 1)
193         cl.openNewConns(t)
194
195 }
196
197 func (t *torrent) invalidateMetadata() {
198         t.MetaData = nil
199         t.metadataHave = nil
200         t.Info = nil
201 }
202
203 func (t *torrent) saveMetadataPiece(index int, data []byte) {
204         if t.haveInfo() {
205                 return
206         }
207         if index >= len(t.metadataHave) {
208                 log.Printf("%s: ignoring metadata piece %d", t, index)
209                 return
210         }
211         copy(t.MetaData[(1<<14)*index:], data)
212         t.metadataHave[index] = true
213 }
214
215 func (t *torrent) metadataPieceCount() int {
216         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
217 }
218
219 func (t *torrent) haveMetadataPiece(piece int) bool {
220         if t.haveInfo() {
221                 return (1<<14)*piece < len(t.MetaData)
222         } else {
223                 return piece < len(t.metadataHave) && t.metadataHave[piece]
224         }
225 }
226
227 func (t *torrent) metadataSizeKnown() bool {
228         return t.MetaData != nil
229 }
230
231 func (t *torrent) metadataSize() int {
232         return len(t.MetaData)
233 }
234
235 func infoPieceHashes(info *metainfo.Info) (ret []string) {
236         for i := 0; i < len(info.Pieces); i += 20 {
237                 ret = append(ret, string(info.Pieces[i:i+20]))
238         }
239         return
240 }
241
242 // Called when metadata for a torrent becomes available.
243 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
244         err = validateInfo(md)
245         if err != nil {
246                 err = fmt.Errorf("bad info: %s", err)
247                 return
248         }
249         t.Info = md
250         t.length = 0
251         for _, f := range t.Info.UpvertedFiles() {
252                 t.length += f.Length
253         }
254         t.MetaData = infoBytes
255         t.metadataHave = nil
256         hashes := infoPieceHashes(md)
257         t.Pieces = make([]piece, len(hashes))
258         for i, hash := range hashes {
259                 piece := &t.Pieces[i]
260                 piece.t = t
261                 piece.index = i
262                 piece.noPendingWrites.L = &piece.pendingWritesMutex
263                 missinggo.CopyExact(piece.Hash[:], hash)
264         }
265         for _, conn := range t.Conns {
266                 if err := conn.setNumPieces(t.numPieces()); err != nil {
267                         log.Printf("closing connection: %s", err)
268                         conn.Close()
269                 }
270         }
271         return
272 }
273
274 func (t *torrent) setStorage(td Data) {
275         if t.data != nil {
276                 t.data.Close()
277         }
278         t.data = td
279         for i := range t.Pieces {
280                 t.updatePieceCompletion(i)
281                 t.Pieces[i].QueuedForHash = true
282         }
283         go func() {
284                 for i := range t.Pieces {
285                         t.verifyPiece(i)
286                 }
287         }()
288 }
289
290 func (t *torrent) verifyPiece(piece int) {
291         t.cl.verifyPiece(t, piece)
292 }
293
294 func (t *torrent) haveAllMetadataPieces() bool {
295         if t.haveInfo() {
296                 return true
297         }
298         if t.metadataHave == nil {
299                 return false
300         }
301         for _, have := range t.metadataHave {
302                 if !have {
303                         return false
304                 }
305         }
306         return true
307 }
308
309 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
310         if t.haveInfo() {
311                 // We already know the correct metadata size.
312                 return
313         }
314         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
315                 log.Printf("received bad metadata size: %d", bytes)
316                 return
317         }
318         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
319                 return
320         }
321         t.MetaData = make([]byte, bytes)
322         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
323         for _, c := range t.Conns {
324                 cl.requestPendingMetadata(t, c)
325         }
326
327 }
328
329 // The current working name for the torrent. Either the name in the info dict,
330 // or a display name given such as by the dn value in a magnet link, or "".
331 func (t *torrent) Name() string {
332         if t.haveInfo() {
333                 return t.Info.Name
334         }
335         return t.displayName
336 }
337
338 func (t *torrent) pieceState(index int) (ret PieceState) {
339         p := &t.Pieces[index]
340         ret.Priority = t.piecePriority(index)
341         if t.pieceComplete(index) {
342                 ret.Complete = true
343         }
344         if p.QueuedForHash || p.Hashing {
345                 ret.Checking = true
346         }
347         if !ret.Complete && t.piecePartiallyDownloaded(index) {
348                 ret.Partial = true
349         }
350         return
351 }
352
353 func (t *torrent) metadataPieceSize(piece int) int {
354         return metadataPieceSize(len(t.MetaData), piece)
355 }
356
357 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
358         d := map[string]int{
359                 "msg_type": msgType,
360                 "piece":    piece,
361         }
362         if data != nil {
363                 d["total_size"] = len(t.MetaData)
364         }
365         p, err := bencode.Marshal(d)
366         if err != nil {
367                 panic(err)
368         }
369         return pp.Message{
370                 Type:            pp.Extended,
371                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
372                 ExtendedPayload: append(p, data...),
373         }
374 }
375
376 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
377         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
378                 ret = append(ret, PieceStateRun{
379                         PieceState: el.(PieceState),
380                         Length:     int(count),
381                 })
382         })
383         for index := range t.Pieces {
384                 rle.Append(t.pieceState(index), 1)
385         }
386         rle.Flush()
387         return
388 }
389
390 // Produces a small string representing a PieceStateRun.
391 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
392         ret = fmt.Sprintf("%d", psr.Length)
393         ret += func() string {
394                 switch psr.Priority {
395                 case PiecePriorityNext:
396                         return "N"
397                 case PiecePriorityNormal:
398                         return "."
399                 case PiecePriorityReadahead:
400                         return "R"
401                 case PiecePriorityNow:
402                         return "!"
403                 default:
404                         return ""
405                 }
406         }()
407         if psr.Checking {
408                 ret += "H"
409         }
410         if psr.Partial {
411                 ret += "P"
412         }
413         if psr.Complete {
414                 ret += "C"
415         }
416         return
417 }
418
419 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
420         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
421         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
422         if !t.haveInfo() {
423                 fmt.Fprintf(w, "Metadata have: ")
424                 for _, h := range t.metadataHave {
425                         fmt.Fprintf(w, "%c", func() rune {
426                                 if h {
427                                         return 'H'
428                                 } else {
429                                         return '.'
430                                 }
431                         }())
432                 }
433                 fmt.Fprintln(w)
434         }
435         fmt.Fprintf(w, "Piece length: %s\n", func() string {
436                 if t.haveInfo() {
437                         return fmt.Sprint(t.usualPieceSize())
438                 } else {
439                         return "?"
440                 }
441         }())
442         if t.haveInfo() {
443                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
444                 fmt.Fprint(w, "Piece States:")
445                 for _, psr := range t.pieceStateRuns() {
446                         w.Write([]byte(" "))
447                         w.Write([]byte(pieceStateRunStatusChars(psr)))
448                 }
449                 fmt.Fprintln(w)
450         }
451         fmt.Fprintf(w, "Reader Pieces:")
452         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
453                 fmt.Fprintf(w, " %d:%d", begin, end)
454                 return true
455         })
456         fmt.Fprintln(w)
457         fmt.Fprintf(w, "Trackers: ")
458         for _, tier := range t.Trackers {
459                 for _, tr := range tier {
460                         fmt.Fprintf(w, "%q ", tr)
461                 }
462         }
463         fmt.Fprintf(w, "\n")
464         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
465         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
466         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
467         sort.Sort(&worstConns{
468                 c:  t.Conns,
469                 t:  t,
470                 cl: cl,
471         })
472         for i, c := range t.Conns {
473                 fmt.Fprintf(w, "%2d. ", i+1)
474                 c.WriteStatus(w, t)
475         }
476 }
477
478 func (t *torrent) String() string {
479         s := t.Name()
480         if s == "" {
481                 s = fmt.Sprintf("%x", t.InfoHash)
482         }
483         return s
484 }
485
486 func (t *torrent) haveInfo() bool {
487         return t.Info != nil
488 }
489
490 // TODO: Include URIs that weren't converted to tracker clients.
491 func (t *torrent) announceList() (al [][]string) {
492         missinggo.CastSlice(&al, t.Trackers)
493         return
494 }
495
496 // Returns a run-time generated MetaInfo that includes the info bytes and
497 // announce-list as currently known to the client.
498 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
499         if t.MetaData == nil {
500                 panic("info bytes not set")
501         }
502         return &metainfo.MetaInfo{
503                 Info: metainfo.InfoEx{
504                         Info:  *t.Info,
505                         Bytes: t.MetaData,
506                 },
507                 CreationDate: time.Now().Unix(),
508                 Comment:      "dynamic metainfo from client",
509                 CreatedBy:    "go.torrent",
510                 AnnounceList: t.announceList(),
511         }
512 }
513
514 func (t *torrent) bytesLeft() (left int64) {
515         if !t.haveInfo() {
516                 return -1
517         }
518         for i := 0; i < t.numPieces(); i++ {
519                 left += int64(t.pieceNumPendingBytes(i))
520         }
521         return
522 }
523
524 func (t *torrent) piecePartiallyDownloaded(piece int) bool {
525         if t.pieceComplete(piece) {
526                 return false
527         }
528         if t.pieceAllDirty(piece) {
529                 return false
530         }
531         return t.Pieces[piece].hasDirtyChunks()
532 }
533
534 func numChunksForPiece(chunkSize int, pieceSize int) int {
535         return (pieceSize + chunkSize - 1) / chunkSize
536 }
537
538 func (t *torrent) usualPieceSize() int {
539         return int(t.Info.PieceLength)
540 }
541
542 func (t *torrent) lastPieceSize() int {
543         return int(t.pieceLength(t.numPieces() - 1))
544 }
545
546 func (t *torrent) numPieces() int {
547         return t.Info.NumPieces()
548 }
549
550 func (t *torrent) numPiecesCompleted() (num int) {
551         return t.completedPieces.Len()
552 }
553
554 // Safe to call with or without client lock.
555 func (t *torrent) isClosed() bool {
556         select {
557         case <-t.closing:
558                 return true
559         default:
560                 return false
561         }
562 }
563
564 func (t *torrent) close() (err error) {
565         if t.isClosed() {
566                 return
567         }
568         t.ceaseNetworking()
569         close(t.closing)
570         if c, ok := t.data.(io.Closer); ok {
571                 c.Close()
572         }
573         for _, conn := range t.Conns {
574                 conn.Close()
575         }
576         t.pieceStateChanges.Close()
577         return
578 }
579
580 func (t *torrent) requestOffset(r request) int64 {
581         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
582 }
583
584 // Return the request that would include the given offset into the torrent
585 // data. Returns !ok if there is no such request.
586 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
587         return torrentOffsetRequest(t.length, t.Info.PieceLength, int64(t.chunkSize), off)
588 }
589
590 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
591         tr := perf.NewTimer()
592         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
593         if err == nil && n != len(data) {
594                 err = io.ErrShortWrite
595         }
596         if err == nil {
597                 tr.Stop("write chunk")
598         }
599         return
600 }
601
602 func (t *torrent) bitfield() (bf []bool) {
603         bf = make([]bool, t.numPieces())
604         t.completedPieces.IterTyped(func(piece int) (again bool) {
605                 bf[piece] = true
606                 return true
607         })
608         return
609 }
610
611 func (t *torrent) validOutgoingRequest(r request) bool {
612         if r.Index >= pp.Integer(t.Info.NumPieces()) {
613                 return false
614         }
615         if r.Begin%t.chunkSize != 0 {
616                 return false
617         }
618         if r.Length > t.chunkSize {
619                 return false
620         }
621         pieceLength := t.pieceLength(int(r.Index))
622         if r.Begin+r.Length > pieceLength {
623                 return false
624         }
625         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
626 }
627
628 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
629         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
630         var cs chunkSpec
631         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
632                 cs.Length = left
633                 if cs.Length > t.chunkSize {
634                         cs.Length = t.chunkSize
635                 }
636                 css = append(css, cs)
637                 cs.Begin += cs.Length
638         }
639         return
640 }
641
642 func (t *torrent) pieceNumChunks(piece int) int {
643         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
644 }
645
646 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
647         t.Pieces[pieceIndex].DirtyChunks.Clear()
648 }
649
650 type Peer struct {
651         Id     [20]byte
652         IP     net.IP
653         Port   int
654         Source peerSource
655         // Peer is known to support encryption.
656         SupportsEncryption bool
657 }
658
659 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
660         if piece < 0 || piece > t.Info.NumPieces() {
661                 return
662         }
663         if int(piece) == t.numPieces()-1 {
664                 len_ = pp.Integer(t.length % t.Info.PieceLength)
665         }
666         if len_ == 0 {
667                 len_ = pp.Integer(t.Info.PieceLength)
668         }
669         return
670 }
671
672 func (t *torrent) hashPiece(piece int) (ret pieceSum) {
673         hash := pieceHash.New()
674         p := &t.Pieces[piece]
675         p.waitNoPendingWrites()
676         ip := t.Info.Piece(piece)
677         pl := ip.Length()
678         n, err := io.Copy(hash, io.NewSectionReader(t.data, ip.Offset(), pl))
679         if n == pl {
680                 missinggo.CopyExact(&ret, hash.Sum(nil))
681                 return
682         }
683         if err != io.ErrUnexpectedEOF {
684                 log.Printf("unexpected error hashing piece with %T: %s", t.data, err)
685         }
686         return
687 }
688
689 func (t *torrent) haveAllPieces() bool {
690         if !t.haveInfo() {
691                 return false
692         }
693         return t.completedPieces.Len() == t.numPieces()
694 }
695
696 func (me *torrent) haveAnyPieces() bool {
697         for i := range me.Pieces {
698                 if me.pieceComplete(i) {
699                         return true
700                 }
701         }
702         return false
703 }
704
705 func (t *torrent) havePiece(index int) bool {
706         return t.haveInfo() && t.pieceComplete(index)
707 }
708
709 func (t *torrent) haveChunk(r request) (ret bool) {
710         // defer func() {
711         //      log.Println("have chunk", r, ret)
712         // }()
713         if !t.haveInfo() {
714                 return false
715         }
716         if t.pieceComplete(int(r.Index)) {
717                 return true
718         }
719         p := &t.Pieces[r.Index]
720         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
721 }
722
723 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
724         return int(cs.Begin / chunkSize)
725 }
726
727 // TODO: This should probably be called wantPiece.
728 func (t *torrent) wantChunk(r request) bool {
729         if !t.wantPiece(int(r.Index)) {
730                 return false
731         }
732         if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
733                 return true
734         }
735         // TODO: What about pieces that were wanted, but aren't now, and aren't
736         // completed either? That used to be done here.
737         return false
738 }
739
740 // TODO: This should be called wantPieceIndex.
741 func (t *torrent) wantPiece(index int) bool {
742         if !t.haveInfo() {
743                 return false
744         }
745         p := &t.Pieces[index]
746         if p.QueuedForHash {
747                 return false
748         }
749         if p.Hashing {
750                 return false
751         }
752         if t.pieceComplete(index) {
753                 return false
754         }
755         if t.pendingPieces.Contains(index) {
756                 return true
757         }
758         return !t.forReaderOffsetPieces(func(begin, end int) bool {
759                 return index < begin || index >= end
760         })
761 }
762
763 func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
764         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
765                 for i := begin; begin < end; i++ {
766                         if !f(i) {
767                                 return false
768                         }
769                 }
770                 return true
771         })
772 }
773
774 func (t *torrent) connHasWantedPieces(c *connection) bool {
775         return !c.pieceRequestOrder.IsEmpty()
776 }
777
778 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
779         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
780                 pieces = append(pieces, int(i))
781         }
782         return
783 }
784
785 func (t *torrent) worstBadConn(cl *Client) *connection {
786         wcs := t.worstConns(cl)
787         heap.Init(wcs)
788         for wcs.Len() != 0 {
789                 c := heap.Pop(wcs).(*connection)
790                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
791                         return c
792                 }
793                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
794                         // Give connections 1 minute to prove themselves.
795                         if time.Since(c.completedHandshake) > time.Minute {
796                                 return c
797                         }
798                 }
799         }
800         return nil
801 }
802
803 type PieceStateChange struct {
804         Index int
805         PieceState
806 }
807
808 func (t *torrent) publishPieceChange(piece int) {
809         cur := t.pieceState(piece)
810         p := &t.Pieces[piece]
811         if cur != p.PublicPieceState {
812                 p.PublicPieceState = cur
813                 t.pieceStateChanges.Publish(PieceStateChange{
814                         piece,
815                         cur,
816                 })
817         }
818 }
819
820 func (t *torrent) pieceNumPendingChunks(piece int) int {
821         if t.pieceComplete(piece) {
822                 return 0
823         }
824         return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
825 }
826
827 func (t *torrent) pieceAllDirty(piece int) bool {
828         return t.Pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
829 }
830
831 func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
832         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
833                 if begin < end {
834                         if !f(begin) {
835                                 return false
836                         }
837                 }
838                 return true
839         })
840 }
841
842 func (t *torrent) readersChanged() {
843         t.updatePiecePriorities()
844 }
845
846 func (t *torrent) maybeNewConns() {
847         // Tickle the accept routine.
848         t.cl.event.Broadcast()
849         t.openNewConns()
850 }
851
852 func (t *torrent) piecePriorityChanged(piece int) {
853         for _, c := range t.Conns {
854                 c.updatePiecePriority(piece)
855         }
856         t.maybeNewConns()
857         t.publishPieceChange(piece)
858 }
859
860 func (t *torrent) updatePiecePriority(piece int) bool {
861         p := &t.Pieces[piece]
862         newPrio := t.piecePriorityUncached(piece)
863         if newPrio == p.priority {
864                 return false
865         }
866         p.priority = newPrio
867         return true
868 }
869
870 // Update all piece priorities in one hit. This function should have the same
871 // output as updatePiecePriority, but across all pieces.
872 func (t *torrent) updatePiecePriorities() {
873         newPrios := make([]piecePriority, t.numPieces())
874         t.pendingPieces.IterTyped(func(piece int) (more bool) {
875                 newPrios[piece] = PiecePriorityNormal
876                 return true
877         })
878         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
879                 if begin < end {
880                         newPrios[begin].Raise(PiecePriorityNow)
881                 }
882                 for i := begin + 1; i < end; i++ {
883                         newPrios[i].Raise(PiecePriorityReadahead)
884                 }
885                 return true
886         })
887         t.completedPieces.IterTyped(func(piece int) (more bool) {
888                 newPrios[piece] = PiecePriorityNone
889                 return true
890         })
891         for i, prio := range newPrios {
892                 if prio != t.Pieces[i].priority {
893                         t.Pieces[i].priority = prio
894                         t.piecePriorityChanged(i)
895                 }
896         }
897 }
898
899 func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
900         if off >= t.length {
901                 return
902         }
903         if off < 0 {
904                 size += off
905                 off = 0
906         }
907         if size <= 0 {
908                 return
909         }
910         begin = int(off / t.Info.PieceLength)
911         end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
912         if end > t.Info.NumPieces() {
913                 end = t.Info.NumPieces()
914         }
915         return
916 }
917
918 // Returns true if all iterations complete without breaking.
919 func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
920         for r := range t.readers {
921                 r.mu.Lock()
922                 pos, readahead := r.pos, r.readahead
923                 r.mu.Unlock()
924                 if readahead < 1 {
925                         readahead = 1
926                 }
927                 begin, end := t.byteRegionPieces(pos, readahead)
928                 if begin >= end {
929                         continue
930                 }
931                 if !f(begin, end) {
932                         return false
933                 }
934         }
935         return true
936 }
937
938 func (t *torrent) piecePriority(piece int) piecePriority {
939         if !t.haveInfo() {
940                 return PiecePriorityNone
941         }
942         return t.Pieces[piece].priority
943 }
944
945 func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
946         ret = PiecePriorityNone
947         if t.pieceComplete(piece) {
948                 return
949         }
950         if t.pendingPieces.Contains(piece) {
951                 ret = PiecePriorityNormal
952         }
953         raiseRet := ret.Raise
954         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
955                 if piece == begin {
956                         raiseRet(PiecePriorityNow)
957                 }
958                 if begin <= piece && piece < end {
959                         raiseRet(PiecePriorityReadahead)
960                 }
961                 return true
962         })
963         return
964 }
965
966 func (t *torrent) pendPiece(piece int) {
967         if t.pendingPieces.Contains(piece) {
968                 return
969         }
970         if t.havePiece(piece) {
971                 return
972         }
973         t.pendingPieces.Add(piece)
974         if !t.updatePiecePriority(piece) {
975                 return
976         }
977         t.piecePriorityChanged(piece)
978 }
979
980 func (t *torrent) getCompletedPieces() (ret bitmap.Bitmap) {
981         return t.completedPieces.Copy()
982 }
983
984 func (t *torrent) unpendPieces(unpend *bitmap.Bitmap) {
985         t.pendingPieces.Sub(unpend)
986         t.updatePiecePriorities()
987 }
988
989 func (t *torrent) pendPieceRange(begin, end int) {
990         for i := begin; i < end; i++ {
991                 t.pendPiece(i)
992         }
993 }
994
995 func (t *torrent) unpendPieceRange(begin, end int) {
996         var bm bitmap.Bitmap
997         bm.AddRange(begin, end)
998         t.unpendPieces(&bm)
999 }
1000
1001 func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
1002         if !c.PeerHasPiece(piece) {
1003                 return true
1004         }
1005         chunkIndices := t.Pieces[piece].undirtiedChunkIndices().ToSortedSlice()
1006         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
1007                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
1008                 return c.Request(req)
1009         })
1010 }
1011
1012 func (t *torrent) pendRequest(req request) {
1013         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1014         t.Pieces[req.Index].pendChunkIndex(ci)
1015 }
1016
1017 func (t *torrent) pieceChanged(piece int) {
1018         t.cl.pieceChanged(t, piece)
1019 }
1020
1021 func (t *torrent) openNewConns() {
1022         t.cl.openNewConns(t)
1023 }
1024
1025 func (t *torrent) getConnPieceInclination() []int {
1026         _ret := t.connPieceInclinationPool.Get()
1027         if _ret == nil {
1028                 pieceInclinationsNew.Add(1)
1029                 return rand.Perm(t.numPieces())
1030         }
1031         pieceInclinationsReused.Add(1)
1032         return _ret.([]int)
1033 }
1034
1035 func (t *torrent) putPieceInclination(pi []int) {
1036         t.connPieceInclinationPool.Put(pi)
1037         pieceInclinationsPut.Add(1)
1038 }
1039
1040 func (t *torrent) updatePieceCompletion(piece int) {
1041         t.completedPieces.Set(piece, t.pieceCompleteUncached(piece))
1042 }