]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
6e726a6eceb57dff784b836098906ad02d45ffd8
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "expvar"
6         "fmt"
7         "io"
8         "log"
9         "math/rand"
10         "net"
11         "sort"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/missinggo"
16         "github.com/anacrolix/missinggo/bitmap"
17         "github.com/anacrolix/missinggo/itertools"
18         "github.com/anacrolix/missinggo/perf"
19         "github.com/anacrolix/missinggo/pubsub"
20
21         "github.com/anacrolix/torrent/bencode"
22         "github.com/anacrolix/torrent/metainfo"
23         pp "github.com/anacrolix/torrent/peer_protocol"
24 )
25
26 func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
27         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
28 }
29
30 func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
31         if t.pieceComplete(index) {
32                 return
33         }
34         piece := &t.Pieces[index]
35         count = t.pieceLength(index)
36         if !piece.EverHashed {
37                 return
38         }
39         regularDirty := piece.numDirtyChunks()
40         lastChunkIndex := t.pieceNumChunks(index) - 1
41         if piece.pendingChunkIndex(lastChunkIndex) {
42                 regularDirty--
43                 count -= t.chunkIndexSpec(lastChunkIndex, index).Length
44         }
45         count -= pp.Integer(regularDirty) * t.chunkSize
46         return
47 }
48
49 type peersKey struct {
50         IPBytes string
51         Port    int
52 }
53
54 // Is not aware of Client. Maintains state of torrent for with-in a Client.
55 type torrent struct {
56         cl *Client
57
58         stateMu sync.Mutex
59         closing chan struct{}
60
61         // Closed when no more network activity is desired. This includes
62         // announcing, and communicating with peers.
63         ceasingNetworking chan struct{}
64
65         InfoHash InfoHash
66         Pieces   []piece
67         // Values are the piece indices that changed.
68         pieceStateChanges *pubsub.PubSub
69         chunkSize         pp.Integer
70         // Total length of the torrent in bytes. Stored because it's not O(1) to
71         // get this from the info dict.
72         length int64
73
74         data Data
75
76         // The info dict. Nil if we don't have it (yet).
77         Info *metainfo.Info
78         // Active peer connections, running message stream loops.
79         Conns []*connection
80         // Set of addrs to which we're attempting to connect. Connections are
81         // half-open until all handshakes are completed.
82         HalfOpen map[string]struct{}
83
84         // Reserve of peers to connect to. A peer can be both here and in the
85         // active connections if were told about the peer after connecting with
86         // them. That encourages us to reconnect to peers that are well known.
87         Peers     map[peersKey]Peer
88         wantPeers sync.Cond
89
90         // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
91         // mirror their respective URLs from the announce-list metainfo key.
92         Trackers []trackerTier
93         // Name used if the info name isn't available.
94         displayName string
95         // The bencoded bytes of the info dict.
96         MetaData []byte
97         // Each element corresponds to the 16KiB metadata pieces. If true, we have
98         // received that piece.
99         metadataHave []bool
100
101         // Closed when .Info is set.
102         gotMetainfo chan struct{}
103
104         readers map[*Reader]struct{}
105
106         pendingPieces   bitmap.Bitmap
107         completedPieces bitmap.Bitmap
108
109         connPieceInclinationPool sync.Pool
110 }
111
112 var (
113         pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
114         pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
115         pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
116 )
117
118 func (t *torrent) setDisplayName(dn string) {
119         t.displayName = dn
120 }
121
122 func (t *torrent) pieceComplete(piece int) bool {
123         return t.completedPieces.Get(piece)
124 }
125
126 func (t *torrent) pieceCompleteUncached(piece int) bool {
127         // TODO: This is called when setting metadata, and before storage is
128         // assigned, which doesn't seem right.
129         return t.data != nil && t.data.PieceComplete(piece)
130 }
131
132 func (t *torrent) numConnsUnchoked() (num int) {
133         for _, c := range t.Conns {
134                 if !c.PeerChoked {
135                         num++
136                 }
137         }
138         return
139 }
140
141 // There's a connection to that address already.
142 func (t *torrent) addrActive(addr string) bool {
143         if _, ok := t.HalfOpen[addr]; ok {
144                 return true
145         }
146         for _, c := range t.Conns {
147                 if c.remoteAddr().String() == addr {
148                         return true
149                 }
150         }
151         return false
152 }
153
154 func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
155         wcs = &worstConns{
156                 c:  make([]*connection, 0, len(t.Conns)),
157                 t:  t,
158                 cl: cl,
159         }
160         for _, c := range t.Conns {
161                 if !c.closed.IsSet() {
162                         wcs.c = append(wcs.c, c)
163                 }
164         }
165         return
166 }
167
168 func (t *torrent) ceaseNetworking() {
169         t.stateMu.Lock()
170         defer t.stateMu.Unlock()
171         select {
172         case <-t.ceasingNetworking:
173                 return
174         default:
175         }
176         close(t.ceasingNetworking)
177         for _, c := range t.Conns {
178                 c.Close()
179         }
180 }
181
182 func (t *torrent) addPeer(p Peer, cl *Client) {
183         cl.openNewConns(t)
184         if len(t.Peers) >= torrentPeersHighWater {
185                 return
186         }
187         key := peersKey{string(p.IP), p.Port}
188         if _, ok := t.Peers[key]; ok {
189                 return
190         }
191         t.Peers[key] = p
192         peersAddedBySource.Add(string(p.Source), 1)
193         cl.openNewConns(t)
194
195 }
196
197 func (t *torrent) invalidateMetadata() {
198         t.MetaData = nil
199         t.metadataHave = nil
200         t.Info = nil
201 }
202
203 func (t *torrent) saveMetadataPiece(index int, data []byte) {
204         if t.haveInfo() {
205                 return
206         }
207         if index >= len(t.metadataHave) {
208                 log.Printf("%s: ignoring metadata piece %d", t, index)
209                 return
210         }
211         copy(t.MetaData[(1<<14)*index:], data)
212         t.metadataHave[index] = true
213 }
214
215 func (t *torrent) metadataPieceCount() int {
216         return (len(t.MetaData) + (1 << 14) - 1) / (1 << 14)
217 }
218
219 func (t *torrent) haveMetadataPiece(piece int) bool {
220         if t.haveInfo() {
221                 return (1<<14)*piece < len(t.MetaData)
222         } else {
223                 return piece < len(t.metadataHave) && t.metadataHave[piece]
224         }
225 }
226
227 func (t *torrent) metadataSizeKnown() bool {
228         return t.MetaData != nil
229 }
230
231 func (t *torrent) metadataSize() int {
232         return len(t.MetaData)
233 }
234
235 func infoPieceHashes(info *metainfo.Info) (ret []string) {
236         for i := 0; i < len(info.Pieces); i += 20 {
237                 ret = append(ret, string(info.Pieces[i:i+20]))
238         }
239         return
240 }
241
242 // Called when metadata for a torrent becomes available.
243 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
244         err = validateInfo(md)
245         if err != nil {
246                 err = fmt.Errorf("bad info: %s", err)
247                 return
248         }
249         t.Info = md
250         t.length = 0
251         for _, f := range t.Info.UpvertedFiles() {
252                 t.length += f.Length
253         }
254         t.MetaData = infoBytes
255         t.metadataHave = nil
256         hashes := infoPieceHashes(md)
257         t.Pieces = make([]piece, len(hashes))
258         for i, hash := range hashes {
259                 piece := &t.Pieces[i]
260                 piece.t = t
261                 piece.index = i
262                 piece.noPendingWrites.L = &piece.pendingWritesMutex
263                 missinggo.CopyExact(piece.Hash[:], hash)
264         }
265         for _, conn := range t.Conns {
266                 if err := conn.setNumPieces(t.numPieces()); err != nil {
267                         log.Printf("closing connection: %s", err)
268                         conn.Close()
269                 }
270         }
271         return
272 }
273
274 func (t *torrent) setStorage(td Data) {
275         if t.data != nil {
276                 t.data.Close()
277         }
278         t.data = td
279         for i := range t.Pieces {
280                 t.updatePieceCompletion(i)
281                 t.Pieces[i].QueuedForHash = true
282         }
283         go func() {
284                 for i := range t.Pieces {
285                         t.verifyPiece(i)
286                 }
287         }()
288 }
289
290 func (t *torrent) verifyPiece(piece int) {
291         t.cl.verifyPiece(t, piece)
292 }
293
294 func (t *torrent) haveAllMetadataPieces() bool {
295         if t.haveInfo() {
296                 return true
297         }
298         if t.metadataHave == nil {
299                 return false
300         }
301         for _, have := range t.metadataHave {
302                 if !have {
303                         return false
304                 }
305         }
306         return true
307 }
308
309 func (t *torrent) setMetadataSize(bytes int64, cl *Client) {
310         if t.haveInfo() {
311                 // We already know the correct metadata size.
312                 return
313         }
314         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
315                 log.Printf("received bad metadata size: %d", bytes)
316                 return
317         }
318         if t.MetaData != nil && len(t.MetaData) == int(bytes) {
319                 return
320         }
321         t.MetaData = make([]byte, bytes)
322         t.metadataHave = make([]bool, (bytes+(1<<14)-1)/(1<<14))
323         for _, c := range t.Conns {
324                 cl.requestPendingMetadata(t, c)
325         }
326
327 }
328
329 // The current working name for the torrent. Either the name in the info dict,
330 // or a display name given such as by the dn value in a magnet link, or "".
331 func (t *torrent) Name() string {
332         if t.haveInfo() {
333                 return t.Info.Name
334         }
335         return t.displayName
336 }
337
338 func (t *torrent) pieceState(index int) (ret PieceState) {
339         p := &t.Pieces[index]
340         ret.Priority = t.piecePriority(index)
341         if t.pieceComplete(index) {
342                 ret.Complete = true
343         }
344         if p.QueuedForHash || p.Hashing {
345                 ret.Checking = true
346         }
347         if !ret.Complete && t.piecePartiallyDownloaded(index) {
348                 ret.Partial = true
349         }
350         return
351 }
352
353 func (t *torrent) metadataPieceSize(piece int) int {
354         return metadataPieceSize(len(t.MetaData), piece)
355 }
356
357 func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
358         d := map[string]int{
359                 "msg_type": msgType,
360                 "piece":    piece,
361         }
362         if data != nil {
363                 d["total_size"] = len(t.MetaData)
364         }
365         p, err := bencode.Marshal(d)
366         if err != nil {
367                 panic(err)
368         }
369         return pp.Message{
370                 Type:            pp.Extended,
371                 ExtendedID:      byte(c.PeerExtensionIDs["ut_metadata"]),
372                 ExtendedPayload: append(p, data...),
373         }
374 }
375
376 func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
377         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
378                 ret = append(ret, PieceStateRun{
379                         PieceState: el.(PieceState),
380                         Length:     int(count),
381                 })
382         })
383         for index := range t.Pieces {
384                 rle.Append(t.pieceState(index), 1)
385         }
386         rle.Flush()
387         return
388 }
389
390 // Produces a small string representing a PieceStateRun.
391 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
392         ret = fmt.Sprintf("%d", psr.Length)
393         ret += func() string {
394                 switch psr.Priority {
395                 case PiecePriorityNext:
396                         return "N"
397                 case PiecePriorityNormal:
398                         return "."
399                 case PiecePriorityReadahead:
400                         return "R"
401                 case PiecePriorityNow:
402                         return "!"
403                 default:
404                         return ""
405                 }
406         }()
407         if psr.Checking {
408                 ret += "H"
409         }
410         if psr.Partial {
411                 ret += "P"
412         }
413         if psr.Complete {
414                 ret += "C"
415         }
416         return
417 }
418
419 func (t *torrent) writeStatus(w io.Writer, cl *Client) {
420         fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
421         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
422         if !t.haveInfo() {
423                 fmt.Fprintf(w, "Metadata have: ")
424                 for _, h := range t.metadataHave {
425                         fmt.Fprintf(w, "%c", func() rune {
426                                 if h {
427                                         return 'H'
428                                 } else {
429                                         return '.'
430                                 }
431                         }())
432                 }
433                 fmt.Fprintln(w)
434         }
435         fmt.Fprintf(w, "Piece length: %s\n", func() string {
436                 if t.haveInfo() {
437                         return fmt.Sprint(t.usualPieceSize())
438                 } else {
439                         return "?"
440                 }
441         }())
442         if t.haveInfo() {
443                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
444                 fmt.Fprint(w, "Piece States:")
445                 for _, psr := range t.pieceStateRuns() {
446                         w.Write([]byte(" "))
447                         w.Write([]byte(pieceStateRunStatusChars(psr)))
448                 }
449                 fmt.Fprintln(w)
450         }
451         fmt.Fprintf(w, "Reader Pieces:")
452         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
453                 fmt.Fprintf(w, " %d:%d", begin, end)
454                 return true
455         })
456         fmt.Fprintln(w)
457         fmt.Fprintf(w, "Trackers: ")
458         for _, tier := range t.Trackers {
459                 for _, tr := range tier {
460                         fmt.Fprintf(w, "%q ", tr)
461                 }
462         }
463         fmt.Fprintf(w, "\n")
464         fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
465         fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
466         fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
467         sort.Sort(&worstConns{
468                 c:  t.Conns,
469                 t:  t,
470                 cl: cl,
471         })
472         for i, c := range t.Conns {
473                 fmt.Fprintf(w, "%2d. ", i+1)
474                 c.WriteStatus(w, t)
475         }
476 }
477
478 func (t *torrent) String() string {
479         s := t.Name()
480         if s == "" {
481                 s = fmt.Sprintf("%x", t.InfoHash)
482         }
483         return s
484 }
485
486 func (t *torrent) haveInfo() bool {
487         return t.Info != nil
488 }
489
490 // TODO: Include URIs that weren't converted to tracker clients.
491 func (t *torrent) announceList() (al [][]string) {
492         missinggo.CastSlice(&al, t.Trackers)
493         return
494 }
495
496 // Returns a run-time generated MetaInfo that includes the info bytes and
497 // announce-list as currently known to the client.
498 func (t *torrent) MetaInfo() *metainfo.MetaInfo {
499         if t.MetaData == nil {
500                 panic("info bytes not set")
501         }
502         return &metainfo.MetaInfo{
503                 Info: metainfo.InfoEx{
504                         Info:  *t.Info,
505                         Bytes: t.MetaData,
506                 },
507                 CreationDate: time.Now().Unix(),
508                 Comment:      "dynamic metainfo from client",
509                 CreatedBy:    "go.torrent",
510                 AnnounceList: t.announceList(),
511         }
512 }
513
514 func (t *torrent) bytesLeft() (left int64) {
515         if !t.haveInfo() {
516                 return -1
517         }
518         for i := 0; i < t.numPieces(); i++ {
519                 left += int64(t.pieceNumPendingBytes(i))
520         }
521         return
522 }
523
524 func (t *torrent) piecePartiallyDownloaded(piece int) bool {
525         if t.pieceComplete(piece) {
526                 return false
527         }
528         if t.pieceAllDirty(piece) {
529                 return false
530         }
531         return t.Pieces[piece].hasDirtyChunks()
532 }
533
534 func numChunksForPiece(chunkSize int, pieceSize int) int {
535         return (pieceSize + chunkSize - 1) / chunkSize
536 }
537
538 func (t *torrent) usualPieceSize() int {
539         return int(t.Info.PieceLength)
540 }
541
542 func (t *torrent) lastPieceSize() int {
543         return int(t.pieceLength(t.numPieces() - 1))
544 }
545
546 func (t *torrent) numPieces() int {
547         return t.Info.NumPieces()
548 }
549
550 func (t *torrent) numPiecesCompleted() (num int) {
551         return t.completedPieces.Len()
552 }
553
554 // Safe to call with or without client lock.
555 func (t *torrent) isClosed() bool {
556         select {
557         case <-t.closing:
558                 return true
559         default:
560                 return false
561         }
562 }
563
564 func (t *torrent) close() (err error) {
565         if t.isClosed() {
566                 return
567         }
568         t.ceaseNetworking()
569         close(t.closing)
570         if c, ok := t.data.(io.Closer); ok {
571                 c.Close()
572         }
573         for _, conn := range t.Conns {
574                 conn.Close()
575         }
576         t.pieceStateChanges.Close()
577         return
578 }
579
580 func (t *torrent) requestOffset(r request) int64 {
581         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
582 }
583
584 // Return the request that would include the given offset into the torrent
585 // data. Returns !ok if there is no such request.
586 func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
587         return torrentOffsetRequest(t.length, t.Info.PieceLength, int64(t.chunkSize), off)
588 }
589
590 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
591         tr := perf.NewTimer()
592         n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
593         if err == nil && n != len(data) {
594                 err = io.ErrShortWrite
595         }
596         if err == nil {
597                 tr.Stop("write chunk")
598         }
599         return
600 }
601
602 func (t *torrent) bitfield() (bf []bool) {
603         bf = make([]bool, t.numPieces())
604         t.completedPieces.IterTyped(func(piece int) (again bool) {
605                 bf[piece] = true
606                 return true
607         })
608         return
609 }
610
611 func (t *torrent) validOutgoingRequest(r request) bool {
612         if r.Index >= pp.Integer(t.Info.NumPieces()) {
613                 return false
614         }
615         if r.Begin%t.chunkSize != 0 {
616                 return false
617         }
618         if r.Length > t.chunkSize {
619                 return false
620         }
621         pieceLength := t.pieceLength(int(r.Index))
622         if r.Begin+r.Length > pieceLength {
623                 return false
624         }
625         return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
626 }
627
628 func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
629         css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
630         var cs chunkSpec
631         for left := t.pieceLength(piece); left != 0; left -= cs.Length {
632                 cs.Length = left
633                 if cs.Length > t.chunkSize {
634                         cs.Length = t.chunkSize
635                 }
636                 css = append(css, cs)
637                 cs.Begin += cs.Length
638         }
639         return
640 }
641
642 func (t *torrent) pieceNumChunks(piece int) int {
643         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
644 }
645
646 func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
647         t.Pieces[pieceIndex].DirtyChunks.Clear()
648 }
649
650 type Peer struct {
651         Id     [20]byte
652         IP     net.IP
653         Port   int
654         Source peerSource
655         // Peer is known to support encryption.
656         SupportsEncryption bool
657 }
658
659 func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
660         if piece < 0 || piece > t.Info.NumPieces() {
661                 return
662         }
663         if int(piece) == t.numPieces()-1 {
664                 len_ = pp.Integer(t.length % t.Info.PieceLength)
665         }
666         if len_ == 0 {
667                 len_ = pp.Integer(t.Info.PieceLength)
668         }
669         return
670 }
671
672 func (t *torrent) hashPiece(piece int) (ps pieceSum) {
673         hash := pieceHash.New()
674         p := &t.Pieces[piece]
675         p.waitNoPendingWrites()
676         pl := t.Info.Piece(int(piece)).Length()
677         n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl)
678         if err != nil {
679                 if err != io.ErrUnexpectedEOF {
680                         log.Printf("error hashing piece with %T: %s", t.data, err)
681                 }
682                 return
683         }
684         if n != pl {
685                 panic(fmt.Sprintf("%T: %d != %d", t.data, n, pl))
686         }
687         missinggo.CopyExact(ps[:], hash.Sum(nil))
688         return
689 }
690
691 func (t *torrent) haveAllPieces() bool {
692         if !t.haveInfo() {
693                 return false
694         }
695         return t.completedPieces.Len() == t.numPieces()
696 }
697
698 func (me *torrent) haveAnyPieces() bool {
699         for i := range me.Pieces {
700                 if me.pieceComplete(i) {
701                         return true
702                 }
703         }
704         return false
705 }
706
707 func (t *torrent) havePiece(index int) bool {
708         return t.haveInfo() && t.pieceComplete(index)
709 }
710
711 func (t *torrent) haveChunk(r request) (ret bool) {
712         // defer func() {
713         //      log.Println("have chunk", r, ret)
714         // }()
715         if !t.haveInfo() {
716                 return false
717         }
718         if t.pieceComplete(int(r.Index)) {
719                 return true
720         }
721         p := &t.Pieces[r.Index]
722         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
723 }
724
725 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
726         return int(cs.Begin / chunkSize)
727 }
728
729 // TODO: This should probably be called wantPiece.
730 func (t *torrent) wantChunk(r request) bool {
731         if !t.wantPiece(int(r.Index)) {
732                 return false
733         }
734         if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
735                 return true
736         }
737         // TODO: What about pieces that were wanted, but aren't now, and aren't
738         // completed either? That used to be done here.
739         return false
740 }
741
742 // TODO: This should be called wantPieceIndex.
743 func (t *torrent) wantPiece(index int) bool {
744         if !t.haveInfo() {
745                 return false
746         }
747         p := &t.Pieces[index]
748         if p.QueuedForHash {
749                 return false
750         }
751         if p.Hashing {
752                 return false
753         }
754         if t.pieceComplete(index) {
755                 return false
756         }
757         if t.pendingPieces.Contains(index) {
758                 return true
759         }
760         return !t.forReaderOffsetPieces(func(begin, end int) bool {
761                 return index < begin || index >= end
762         })
763 }
764
765 func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
766         return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
767                 for i := begin; begin < end; i++ {
768                         if !f(i) {
769                                 return false
770                         }
771                 }
772                 return true
773         })
774 }
775
776 func (t *torrent) connHasWantedPieces(c *connection) bool {
777         return !c.pieceRequestOrder.IsEmpty()
778 }
779
780 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
781         for i := off / int64(t.usualPieceSize()); i*int64(t.usualPieceSize()) < off+_len; i++ {
782                 pieces = append(pieces, int(i))
783         }
784         return
785 }
786
787 func (t *torrent) worstBadConn(cl *Client) *connection {
788         wcs := t.worstConns(cl)
789         heap.Init(wcs)
790         for wcs.Len() != 0 {
791                 c := heap.Pop(wcs).(*connection)
792                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
793                         return c
794                 }
795                 if wcs.Len() >= (socketsPerTorrent+1)/2 {
796                         // Give connections 1 minute to prove themselves.
797                         if time.Since(c.completedHandshake) > time.Minute {
798                                 return c
799                         }
800                 }
801         }
802         return nil
803 }
804
805 type PieceStateChange struct {
806         Index int
807         PieceState
808 }
809
810 func (t *torrent) publishPieceChange(piece int) {
811         cur := t.pieceState(piece)
812         p := &t.Pieces[piece]
813         if cur != p.PublicPieceState {
814                 p.PublicPieceState = cur
815                 t.pieceStateChanges.Publish(PieceStateChange{
816                         piece,
817                         cur,
818                 })
819         }
820 }
821
822 func (t *torrent) pieceNumPendingChunks(piece int) int {
823         if t.pieceComplete(piece) {
824                 return 0
825         }
826         return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
827 }
828
829 func (t *torrent) pieceAllDirty(piece int) bool {
830         return t.Pieces[piece].DirtyChunks.Len() == t.pieceNumChunks(piece)
831 }
832
833 func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
834         return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
835                 if begin < end {
836                         if !f(begin) {
837                                 return false
838                         }
839                 }
840                 return true
841         })
842 }
843
844 func (t *torrent) readersChanged() {
845         t.updatePiecePriorities()
846 }
847
848 func (t *torrent) maybeNewConns() {
849         // Tickle the accept routine.
850         t.cl.event.Broadcast()
851         t.openNewConns()
852 }
853
854 func (t *torrent) piecePriorityChanged(piece int) {
855         for _, c := range t.Conns {
856                 c.updatePiecePriority(piece)
857         }
858         t.maybeNewConns()
859         t.publishPieceChange(piece)
860 }
861
862 func (t *torrent) updatePiecePriority(piece int) bool {
863         p := &t.Pieces[piece]
864         newPrio := t.piecePriorityUncached(piece)
865         if newPrio == p.priority {
866                 return false
867         }
868         p.priority = newPrio
869         return true
870 }
871
872 // Update all piece priorities in one hit. This function should have the same
873 // output as updatePiecePriority, but across all pieces.
874 func (t *torrent) updatePiecePriorities() {
875         newPrios := make([]piecePriority, t.numPieces())
876         t.pendingPieces.IterTyped(func(piece int) (more bool) {
877                 newPrios[piece] = PiecePriorityNormal
878                 return true
879         })
880         t.forReaderOffsetPieces(func(begin, end int) (next bool) {
881                 if begin < end {
882                         newPrios[begin].Raise(PiecePriorityNow)
883                 }
884                 for i := begin + 1; i < end; i++ {
885                         newPrios[i].Raise(PiecePriorityReadahead)
886                 }
887                 return true
888         })
889         t.completedPieces.IterTyped(func(piece int) (more bool) {
890                 newPrios[piece] = PiecePriorityNone
891                 return true
892         })
893         for i, prio := range newPrios {
894                 if prio != t.Pieces[i].priority {
895                         t.Pieces[i].priority = prio
896                         t.piecePriorityChanged(i)
897                 }
898         }
899 }
900
901 func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
902         if off >= t.length {
903                 return
904         }
905         if off < 0 {
906                 size += off
907                 off = 0
908         }
909         if size <= 0 {
910                 return
911         }
912         begin = int(off / t.Info.PieceLength)
913         end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
914         if end > t.Info.NumPieces() {
915                 end = t.Info.NumPieces()
916         }
917         return
918 }
919
920 // Returns true if all iterations complete without breaking.
921 func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
922         for r := range t.readers {
923                 r.mu.Lock()
924                 pos, readahead := r.pos, r.readahead
925                 r.mu.Unlock()
926                 if readahead < 1 {
927                         readahead = 1
928                 }
929                 begin, end := t.byteRegionPieces(pos, readahead)
930                 if begin >= end {
931                         continue
932                 }
933                 if !f(begin, end) {
934                         return false
935                 }
936         }
937         return true
938 }
939
940 func (t *torrent) piecePriority(piece int) piecePriority {
941         if !t.haveInfo() {
942                 return PiecePriorityNone
943         }
944         return t.Pieces[piece].priority
945 }
946
947 func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
948         ret = PiecePriorityNone
949         if t.pieceComplete(piece) {
950                 return
951         }
952         if t.pendingPieces.Contains(piece) {
953                 ret = PiecePriorityNormal
954         }
955         raiseRet := ret.Raise
956         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
957                 if piece == begin {
958                         raiseRet(PiecePriorityNow)
959                 }
960                 if begin <= piece && piece < end {
961                         raiseRet(PiecePriorityReadahead)
962                 }
963                 return true
964         })
965         return
966 }
967
968 func (t *torrent) pendPiece(piece int) {
969         if t.pendingPieces.Contains(piece) {
970                 return
971         }
972         if t.havePiece(piece) {
973                 return
974         }
975         t.pendingPieces.Add(piece)
976         if !t.updatePiecePriority(piece) {
977                 return
978         }
979         t.piecePriorityChanged(piece)
980 }
981
982 func (t *torrent) getCompletedPieces() (ret bitmap.Bitmap) {
983         return t.completedPieces.Copy()
984 }
985
986 func (t *torrent) unpendPieces(unpend *bitmap.Bitmap) {
987         t.pendingPieces.Sub(unpend)
988         t.updatePiecePriorities()
989 }
990
991 func (t *torrent) pendPieceRange(begin, end int) {
992         for i := begin; i < end; i++ {
993                 t.pendPiece(i)
994         }
995 }
996
997 func (t *torrent) unpendPieceRange(begin, end int) {
998         var bm bitmap.Bitmap
999         bm.AddRange(begin, end)
1000         t.unpendPieces(&bm)
1001 }
1002
1003 func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
1004         if !c.PeerHasPiece(piece) {
1005                 return true
1006         }
1007         chunkIndices := t.Pieces[piece].undirtiedChunkIndices().ToSortedSlice()
1008         return itertools.ForPerm(len(chunkIndices), func(i int) bool {
1009                 req := request{pp.Integer(piece), t.chunkIndexSpec(chunkIndices[i], piece)}
1010                 return c.Request(req)
1011         })
1012 }
1013
1014 func (t *torrent) pendRequest(req request) {
1015         ci := chunkIndex(req.chunkSpec, t.chunkSize)
1016         t.Pieces[req.Index].pendChunkIndex(ci)
1017 }
1018
1019 func (t *torrent) pieceChanged(piece int) {
1020         t.cl.pieceChanged(t, piece)
1021 }
1022
1023 func (t *torrent) openNewConns() {
1024         t.cl.openNewConns(t)
1025 }
1026
1027 func (t *torrent) getConnPieceInclination() []int {
1028         _ret := t.connPieceInclinationPool.Get()
1029         if _ret == nil {
1030                 pieceInclinationsNew.Add(1)
1031                 return rand.Perm(t.numPieces())
1032         }
1033         pieceInclinationsReused.Add(1)
1034         return _ret.([]int)
1035 }
1036
1037 func (t *torrent) putPieceInclination(pi []int) {
1038         t.connPieceInclinationPool.Put(pi)
1039         pieceInclinationsPut.Add(1)
1040 }
1041
1042 func (t *torrent) updatePieceCompletion(piece int) {
1043         t.completedPieces.Set(piece, t.pieceCompleteUncached(piece))
1044 }