]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
46575942b4367383f365db9d42d06d8fa8605e35
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/bradfitz/iter"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         networkingEnabled bool
48         requestStrategy   int
49
50         closed   missinggo.Event
51         infoHash metainfo.Hash
52         pieces   []Piece
53         // Values are the piece indices that changed.
54         pieceStateChanges *pubsub.PubSub
55         // The size of chunks to request from peers over the wire. This is
56         // normally 16KiB by convention these days.
57         chunkSize pp.Integer
58         chunkPool *sync.Pool
59         // Total length of the torrent in bytes. Stored because it's not O(1) to
60         // get this from the info dict.
61         length int64
62
63         // The storage to open when the info dict becomes available.
64         storageOpener *storage.Client
65         // Storage for torrent data.
66         storage *storage.Torrent
67         // Read-locked for using storage, and write-locked for Closing.
68         storageLock sync.RWMutex
69
70         metainfo metainfo.MetaInfo
71
72         // The info dict. nil if we don't have it (yet).
73         info *metainfo.Info
74
75         // Active peer connections, running message stream loops.
76         conns               map[*connection]struct{}
77         maxEstablishedConns int
78         // Set of addrs to which we're attempting to connect. Connections are
79         // half-open until all handshakes are completed.
80         halfOpen    map[string]Peer
81         fastestConn *connection
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known in
86         // the swarm.
87         peers          map[peersKey]Peer
88         wantPeersEvent missinggo.Event
89         // An announcer for each tracker URL.
90         trackerAnnouncers map[string]*trackerScraper
91         // How many times we've initiated a DHT announce. TODO: Move into stats.
92         numDHTAnnounces int
93
94         // Name used if the info name isn't available. Should be cleared when the
95         // Info does become available.
96         displayName string
97
98         // The bencoded bytes of the info dict. This is actively manipulated if
99         // the info bytes aren't initially available, and we try to fetch them
100         // from peers.
101         metadataBytes []byte
102         // Each element corresponds to the 16KiB metadata pieces. If true, we have
103         // received that piece.
104         metadataCompletedChunks []bool
105         metadataChanged         sync.Cond
106
107         // Set when .Info is obtained.
108         gotMetainfo missinggo.Event
109
110         readers               map[*Reader]struct{}
111         readerNowPieces       bitmap.Bitmap
112         readerReadaheadPieces bitmap.Bitmap
113
114         // The indexes of pieces we want with normal priority, that aren't
115         // currently available.
116         pendingPieces bitmap.Bitmap
117         // A cache of completed piece indices.
118         completedPieces bitmap.Bitmap
119         // Pieces that need to be hashed.
120         piecesQueuedForHash bitmap.Bitmap
121
122         // A pool of piece priorities []int for assignment to new connections.
123         // These "inclinations" are used to give connections preference for
124         // different pieces.
125         connPieceInclinationPool sync.Pool
126         // Torrent-level statistics.
127         stats TorrentStats
128 }
129
130 // Returns a channel that is closed when the Torrent is closed.
131 func (t *Torrent) Closed() <-chan struct{} {
132         return t.closed.LockedChan(&t.cl.mu)
133 }
134
135 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
136 // pending, and half-open peers.
137 func (t *Torrent) KnownSwarm() (ks []Peer) {
138         // Add pending peers to the list
139         for _, peer := range t.peers {
140                 ks = append(ks, peer)
141         }
142
143         // Add half-open peers to the list
144         for _, peer := range t.halfOpen {
145                 ks = append(ks, peer)
146         }
147
148         // Add active peers to the list
149         for conn := range t.conns {
150                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
151                 if err != nil {
152                         panic(err)
153                 }
154
155                 ip := net.ParseIP(host)
156                 port, err := strconv.Atoi(portString)
157                 if err != nil {
158                         panic(err)
159                 }
160
161                 ks = append(ks, Peer{
162                         Id:     conn.PeerID,
163                         IP:     ip,
164                         Port:   port,
165                         Source: conn.Discovery,
166                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
167                         // > But if we're not connected to them with an encrypted connection, I couldn't say
168                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
169                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
170                         // > connection. It's probably easiest to do the latter for now.
171                         // https://github.com/anacrolix/torrent/pull/188
172                         SupportsEncryption: conn.headerEncrypted,
173                 })
174         }
175
176         return
177 }
178
179 func (t *Torrent) setChunkSize(size pp.Integer) {
180         t.chunkSize = size
181         t.chunkPool = &sync.Pool{
182                 New: func() interface{} {
183                         b := make([]byte, size)
184                         return &b
185                 },
186         }
187 }
188
189 func (t *Torrent) setDisplayName(dn string) {
190         if t.haveInfo() {
191                 return
192         }
193         t.displayName = dn
194 }
195
196 func (t *Torrent) pieceComplete(piece int) bool {
197         return t.completedPieces.Get(piece)
198 }
199
200 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
201         return t.pieces[piece].Storage().Completion()
202 }
203
204 // There's a connection to that address already.
205 func (t *Torrent) addrActive(addr string) bool {
206         if _, ok := t.halfOpen[addr]; ok {
207                 return true
208         }
209         for c := range t.conns {
210                 ra := c.remoteAddr()
211                 if ra == nil {
212                         continue
213                 }
214                 if ra.String() == addr {
215                         return true
216                 }
217         }
218         return false
219 }
220
221 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
222         ret = make([]*connection, 0, len(t.conns))
223         for c := range t.conns {
224                 if !c.closed.IsSet() {
225                         ret = append(ret, c)
226                 }
227         }
228         return
229 }
230
231 func (t *Torrent) addPeer(p Peer) {
232         cl := t.cl
233         cl.openNewConns(t)
234         if len(t.peers) >= cl.config.TorrentPeersHighWater {
235                 return
236         }
237         key := peersKey{string(p.IP), p.Port}
238         if _, ok := t.peers[key]; ok {
239                 return
240         }
241         t.peers[key] = p
242         peersAddedBySource.Add(string(p.Source), 1)
243         cl.openNewConns(t)
244
245 }
246
247 func (t *Torrent) invalidateMetadata() {
248         for i := range t.metadataCompletedChunks {
249                 t.metadataCompletedChunks[i] = false
250         }
251         t.info = nil
252 }
253
254 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
255         if t.haveInfo() {
256                 return
257         }
258         if index >= len(t.metadataCompletedChunks) {
259                 log.Printf("%s: ignoring metadata piece %d", t, index)
260                 return
261         }
262         copy(t.metadataBytes[(1<<14)*index:], data)
263         t.metadataCompletedChunks[index] = true
264 }
265
266 func (t *Torrent) metadataPieceCount() int {
267         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
268 }
269
270 func (t *Torrent) haveMetadataPiece(piece int) bool {
271         if t.haveInfo() {
272                 return (1<<14)*piece < len(t.metadataBytes)
273         } else {
274                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
275         }
276 }
277
278 func (t *Torrent) metadataSizeKnown() bool {
279         return t.metadataBytes != nil
280 }
281
282 func (t *Torrent) metadataSize() int {
283         return len(t.metadataBytes)
284 }
285
286 func infoPieceHashes(info *metainfo.Info) (ret []string) {
287         for i := 0; i < len(info.Pieces); i += sha1.Size {
288                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
289         }
290         return
291 }
292
293 func (t *Torrent) makePieces() {
294         hashes := infoPieceHashes(t.info)
295         t.pieces = make([]Piece, len(hashes))
296         for i, hash := range hashes {
297                 piece := &t.pieces[i]
298                 piece.t = t
299                 piece.index = i
300                 piece.noPendingWrites.L = &piece.pendingWritesMutex
301                 missinggo.CopyExact(piece.hash[:], hash)
302         }
303 }
304
305 // Called when metadata for a torrent becomes available.
306 func (t *Torrent) setInfoBytes(b []byte) error {
307         if t.haveInfo() {
308                 return nil
309         }
310         if metainfo.HashBytes(b) != t.infoHash {
311                 return errors.New("info bytes have wrong hash")
312         }
313         var info metainfo.Info
314         err := bencode.Unmarshal(b, &info)
315         if err != nil {
316                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
317         }
318         err = validateInfo(&info)
319         if err != nil {
320                 return fmt.Errorf("bad info: %s", err)
321         }
322         defer t.updateWantPeersEvent()
323         t.info = &info
324         t.displayName = "" // Save a few bytes lol.
325         t.cl.event.Broadcast()
326         t.gotMetainfo.Set()
327         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
328         if err != nil {
329                 return fmt.Errorf("error opening torrent storage: %s", err)
330         }
331         t.length = 0
332         for _, f := range t.info.UpvertedFiles() {
333                 t.length += f.Length
334         }
335         t.metadataBytes = b
336         t.metadataCompletedChunks = nil
337         t.makePieces()
338         for conn := range t.conns {
339                 if err := conn.setNumPieces(t.numPieces()); err != nil {
340                         log.Printf("closing connection: %s", err)
341                         conn.Close()
342                 }
343         }
344         for i := range t.pieces {
345                 t.updatePieceCompletion(i)
346                 p := &t.pieces[i]
347                 if !p.storageCompletionOk {
348                         // log.Printf("piece %s completion unknown, queueing check", p)
349                         t.queuePieceCheck(i)
350                 }
351         }
352         return nil
353 }
354
355 func (t *Torrent) haveAllMetadataPieces() bool {
356         if t.haveInfo() {
357                 return true
358         }
359         if t.metadataCompletedChunks == nil {
360                 return false
361         }
362         for _, have := range t.metadataCompletedChunks {
363                 if !have {
364                         return false
365                 }
366         }
367         return true
368 }
369
370 // TODO: Propagate errors to disconnect peer.
371 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
372         if t.haveInfo() {
373                 // We already know the correct metadata size.
374                 return
375         }
376         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
377                 return errors.New("bad size")
378         }
379         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
380                 return
381         }
382         t.metadataBytes = make([]byte, bytes)
383         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
384         t.metadataChanged.Broadcast()
385         for c := range t.conns {
386                 c.requestPendingMetadata()
387         }
388         return
389 }
390
391 // The current working name for the torrent. Either the name in the info dict,
392 // or a display name given such as by the dn value in a magnet link, or "".
393 func (t *Torrent) name() string {
394         if t.haveInfo() {
395                 return t.info.Name
396         }
397         return t.displayName
398 }
399
400 func (t *Torrent) pieceState(index int) (ret PieceState) {
401         p := &t.pieces[index]
402         ret.Priority = t.piecePriority(index)
403         if t.pieceComplete(index) {
404                 ret.Complete = true
405         }
406         if p.queuedForHash() || p.hashing {
407                 ret.Checking = true
408         }
409         if !ret.Complete && t.piecePartiallyDownloaded(index) {
410                 ret.Partial = true
411         }
412         return
413 }
414
415 func (t *Torrent) metadataPieceSize(piece int) int {
416         return metadataPieceSize(len(t.metadataBytes), piece)
417 }
418
419 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
420         d := map[string]int{
421                 "msg_type": msgType,
422                 "piece":    piece,
423         }
424         if data != nil {
425                 d["total_size"] = len(t.metadataBytes)
426         }
427         p, err := bencode.Marshal(d)
428         if err != nil {
429                 panic(err)
430         }
431         return pp.Message{
432                 Type:            pp.Extended,
433                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
434                 ExtendedPayload: append(p, data...),
435         }
436 }
437
438 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
439         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
440                 ret = append(ret, PieceStateRun{
441                         PieceState: el.(PieceState),
442                         Length:     int(count),
443                 })
444         })
445         for index := range t.pieces {
446                 rle.Append(t.pieceState(index), 1)
447         }
448         rle.Flush()
449         return
450 }
451
452 // Produces a small string representing a PieceStateRun.
453 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
454         ret = fmt.Sprintf("%d", psr.Length)
455         ret += func() string {
456                 switch psr.Priority {
457                 case PiecePriorityNext:
458                         return "N"
459                 case PiecePriorityNormal:
460                         return "."
461                 case PiecePriorityReadahead:
462                         return "R"
463                 case PiecePriorityNow:
464                         return "!"
465                 default:
466                         return ""
467                 }
468         }()
469         if psr.Checking {
470                 ret += "H"
471         }
472         if psr.Partial {
473                 ret += "P"
474         }
475         if psr.Complete {
476                 ret += "C"
477         }
478         return
479 }
480
481 func (t *Torrent) writeStatus(w io.Writer) {
482         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
483         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
484         if !t.haveInfo() {
485                 fmt.Fprintf(w, "Metadata have: ")
486                 for _, h := range t.metadataCompletedChunks {
487                         fmt.Fprintf(w, "%c", func() rune {
488                                 if h {
489                                         return 'H'
490                                 } else {
491                                         return '.'
492                                 }
493                         }())
494                 }
495                 fmt.Fprintln(w)
496         }
497         fmt.Fprintf(w, "Piece length: %s\n", func() string {
498                 if t.haveInfo() {
499                         return fmt.Sprint(t.usualPieceSize())
500                 } else {
501                         return "?"
502                 }
503         }())
504         if t.info != nil {
505                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
506                 fmt.Fprint(w, "Piece States:")
507                 for _, psr := range t.pieceStateRuns() {
508                         w.Write([]byte(" "))
509                         w.Write([]byte(pieceStateRunStatusChars(psr)))
510                 }
511                 fmt.Fprintln(w)
512         }
513         fmt.Fprintf(w, "Reader Pieces:")
514         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
515                 fmt.Fprintf(w, " %d:%d", begin, end)
516                 return true
517         })
518         fmt.Fprintln(w)
519
520         fmt.Fprintf(w, "Trackers:\n")
521         func() {
522                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
523                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
524                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
525                         return l.url < r.url
526                 }).([]*trackerScraper) {
527                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
528                 }
529                 tw.Flush()
530         }()
531
532         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
533
534         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
535         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
536         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
537         conns := t.connsAsSlice()
538         slices.Sort(conns, worseConn)
539         for i, c := range conns {
540                 fmt.Fprintf(w, "%2d. ", i+1)
541                 c.WriteStatus(w, t)
542         }
543 }
544
545 func (t *Torrent) haveInfo() bool {
546         return t.info != nil
547 }
548
549 // Returns a run-time generated MetaInfo that includes the info bytes and
550 // announce-list as currently known to the client.
551 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
552         return metainfo.MetaInfo{
553                 CreationDate: time.Now().Unix(),
554                 Comment:      "dynamic metainfo from client",
555                 CreatedBy:    "go.torrent",
556                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
557                 InfoBytes: func() []byte {
558                         if t.haveInfo() {
559                                 return t.metadataBytes
560                         } else {
561                                 return nil
562                         }
563                 }(),
564         }
565 }
566
567 func (t *Torrent) BytesMissing() int64 {
568         t.mu().RLock()
569         defer t.mu().RUnlock()
570         return t.bytesMissingLocked()
571 }
572
573 func (t *Torrent) bytesMissingLocked() int64 {
574         return t.bytesLeft()
575 }
576
577 func (t *Torrent) bytesLeft() (left int64) {
578         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
579                 p := &t.pieces[piece]
580                 left += int64(p.length() - p.numDirtyBytes())
581                 return true
582         })
583         return
584 }
585
586 // Bytes left to give in tracker announces.
587 func (t *Torrent) bytesLeftAnnounce() uint64 {
588         if t.haveInfo() {
589                 return uint64(t.bytesLeft())
590         } else {
591                 return math.MaxUint64
592         }
593 }
594
595 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
596         if t.pieceComplete(piece) {
597                 return false
598         }
599         if t.pieceAllDirty(piece) {
600                 return false
601         }
602         return t.pieces[piece].hasDirtyChunks()
603 }
604
605 func (t *Torrent) usualPieceSize() int {
606         return int(t.info.PieceLength)
607 }
608
609 func (t *Torrent) numPieces() int {
610         return t.info.NumPieces()
611 }
612
613 func (t *Torrent) numPiecesCompleted() (num int) {
614         return t.completedPieces.Len()
615 }
616
617 func (t *Torrent) close() (err error) {
618         t.closed.Set()
619         if t.storage != nil {
620                 t.storageLock.Lock()
621                 t.storage.Close()
622                 t.storageLock.Unlock()
623         }
624         for conn := range t.conns {
625                 conn.Close()
626         }
627         t.cl.event.Broadcast()
628         t.pieceStateChanges.Close()
629         t.updateWantPeersEvent()
630         return
631 }
632
633 func (t *Torrent) requestOffset(r request) int64 {
634         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
635 }
636
637 // Return the request that would include the given offset into the torrent
638 // data. Returns !ok if there is no such request.
639 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
640         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
641 }
642
643 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
644         tr := perf.NewTimer()
645
646         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
647         if err == nil && n != len(data) {
648                 err = io.ErrShortWrite
649         }
650         if err == nil {
651                 tr.Mark("write chunk")
652         }
653         return
654 }
655
656 func (t *Torrent) bitfield() (bf []bool) {
657         bf = make([]bool, t.numPieces())
658         t.completedPieces.IterTyped(func(piece int) (again bool) {
659                 bf[piece] = true
660                 return true
661         })
662         return
663 }
664
665 func (t *Torrent) pieceNumChunks(piece int) int {
666         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
667 }
668
669 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
670         t.pieces[pieceIndex].dirtyChunks.Clear()
671 }
672
673 type Peer struct {
674         Id     [20]byte
675         IP     net.IP
676         Port   int
677         Source peerSource
678         // Peer is known to support encryption.
679         SupportsEncryption bool
680 }
681
682 func (t *Torrent) pieceLength(piece int) pp.Integer {
683         if piece == t.numPieces()-1 {
684                 ret := pp.Integer(t.length % t.info.PieceLength)
685                 if ret != 0 {
686                         return ret
687                 }
688         }
689         return pp.Integer(t.info.PieceLength)
690 }
691
692 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
693         hash := pieceHash.New()
694         p := &t.pieces[piece]
695         p.waitNoPendingWrites()
696         ip := t.info.Piece(piece)
697         pl := ip.Length()
698         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
699         if n == pl {
700                 missinggo.CopyExact(&ret, hash.Sum(nil))
701                 return
702         }
703         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
704                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
705         }
706         return
707 }
708
709 func (t *Torrent) haveAnyPieces() bool {
710         for i := range t.pieces {
711                 if t.pieceComplete(i) {
712                         return true
713                 }
714         }
715         return false
716 }
717
718 func (t *Torrent) havePiece(index int) bool {
719         return t.haveInfo() && t.pieceComplete(index)
720 }
721
722 func (t *Torrent) haveChunk(r request) (ret bool) {
723         // defer func() {
724         //      log.Println("have chunk", r, ret)
725         // }()
726         if !t.haveInfo() {
727                 return false
728         }
729         if t.pieceComplete(int(r.Index)) {
730                 return true
731         }
732         p := &t.pieces[r.Index]
733         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
734 }
735
736 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
737         return int(cs.Begin / chunkSize)
738 }
739
740 func (t *Torrent) wantPiece(r request) bool {
741         if !t.wantPieceIndex(int(r.Index)) {
742                 return false
743         }
744         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
745                 return true
746         }
747         // TODO: What about pieces that were wanted, but aren't now, and aren't
748         // completed either? That used to be done here.
749         return false
750 }
751
752 func (t *Torrent) wantPieceIndex(index int) bool {
753         if !t.haveInfo() {
754                 return false
755         }
756         if index < 0 || index >= t.numPieces() {
757                 return false
758         }
759         p := &t.pieces[index]
760         if p.queuedForHash() {
761                 return false
762         }
763         if p.hashing {
764                 return false
765         }
766         if t.pieceComplete(index) {
767                 return false
768         }
769         if t.pendingPieces.Contains(index) {
770                 return true
771         }
772         return !t.forReaderOffsetPieces(func(begin, end int) bool {
773                 return index < begin || index >= end
774         })
775 }
776
777 // The worst connection is one that hasn't been sent, or sent anything useful
778 // for the longest. A bad connection is one that usually sends us unwanted
779 // pieces, or has been in worser half of the established connections for more
780 // than a minute.
781 func (t *Torrent) worstBadConn() *connection {
782         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
783         heap.Init(&wcs)
784         for wcs.Len() != 0 {
785                 c := heap.Pop(&wcs).(*connection)
786                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
787                         return c
788                 }
789                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
790                         // Give connections 1 minute to prove themselves.
791                         if time.Since(c.completedHandshake) > time.Minute {
792                                 return c
793                         }
794                 }
795         }
796         return nil
797 }
798
799 type PieceStateChange struct {
800         Index int
801         PieceState
802 }
803
804 func (t *Torrent) publishPieceChange(piece int) {
805         cur := t.pieceState(piece)
806         p := &t.pieces[piece]
807         if cur != p.publicPieceState {
808                 p.publicPieceState = cur
809                 t.pieceStateChanges.Publish(PieceStateChange{
810                         piece,
811                         cur,
812                 })
813         }
814 }
815
816 func (t *Torrent) pieceNumPendingChunks(piece int) int {
817         if t.pieceComplete(piece) {
818                 return 0
819         }
820         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
821 }
822
823 func (t *Torrent) pieceAllDirty(piece int) bool {
824         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
825 }
826
827 func (t *Torrent) readersChanged() {
828         t.updateReaderPieces()
829         t.updateAllPiecePriorities()
830 }
831
832 func (t *Torrent) updateReaderPieces() {
833         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
834 }
835
836 func (t *Torrent) readerPosChanged(from, to pieceRange) {
837         if from == to {
838                 return
839         }
840         t.updateReaderPieces()
841         // Order the ranges, high and low.
842         l, h := from, to
843         if l.begin > h.begin {
844                 l, h = h, l
845         }
846         if l.end < h.begin {
847                 // Two distinct ranges.
848                 t.updatePiecePriorities(l.begin, l.end)
849                 t.updatePiecePriorities(h.begin, h.end)
850         } else {
851                 // Ranges overlap.
852                 end := l.end
853                 if h.end > end {
854                         end = h.end
855                 }
856                 t.updatePiecePriorities(l.begin, end)
857         }
858 }
859
860 func (t *Torrent) maybeNewConns() {
861         // Tickle the accept routine.
862         t.cl.event.Broadcast()
863         t.openNewConns()
864 }
865
866 func (t *Torrent) piecePriorityChanged(piece int) {
867         for c := range t.conns {
868                 if c.updatePiecePriority(piece) {
869                         c.updateRequests()
870                 }
871         }
872         t.maybeNewConns()
873         t.publishPieceChange(piece)
874 }
875
876 func (t *Torrent) updatePiecePriority(piece int) {
877         p := &t.pieces[piece]
878         newPrio := t.piecePriorityUncached(piece)
879         if newPrio == p.priority {
880                 return
881         }
882         p.priority = newPrio
883         t.piecePriorityChanged(piece)
884 }
885
886 func (t *Torrent) updateAllPiecePriorities() {
887         t.updatePiecePriorities(0, len(t.pieces))
888 }
889
890 // Update all piece priorities in one hit. This function should have the same
891 // output as updatePiecePriority, but across all pieces.
892 func (t *Torrent) updatePiecePriorities(begin, end int) {
893         for i := begin; i < end; i++ {
894                 t.updatePiecePriority(i)
895         }
896 }
897
898 // Returns the range of pieces [begin, end) that contains the extent of bytes.
899 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
900         if off >= t.length {
901                 return
902         }
903         if off < 0 {
904                 size += off
905                 off = 0
906         }
907         if size <= 0 {
908                 return
909         }
910         begin = int(off / t.info.PieceLength)
911         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
912         if end > t.info.NumPieces() {
913                 end = t.info.NumPieces()
914         }
915         return
916 }
917
918 // Returns true if all iterations complete without breaking. Returns the read
919 // regions for all readers. The reader regions should not be merged as some
920 // callers depend on this method to enumerate readers.
921 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
922         for r := range t.readers {
923                 p := r.pieces
924                 if p.begin >= p.end {
925                         continue
926                 }
927                 if !f(p.begin, p.end) {
928                         return false
929                 }
930         }
931         return true
932 }
933
934 func (t *Torrent) piecePriority(piece int) piecePriority {
935         if !t.haveInfo() {
936                 return PiecePriorityNone
937         }
938         return t.pieces[piece].priority
939 }
940
941 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
942         if t.pieceComplete(piece) {
943                 return PiecePriorityNone
944         }
945         if t.readerNowPieces.Contains(piece) {
946                 return PiecePriorityNow
947         }
948         // if t.readerNowPieces.Contains(piece - 1) {
949         //      return PiecePriorityNext
950         // }
951         if t.readerReadaheadPieces.Contains(piece) {
952                 return PiecePriorityReadahead
953         }
954         if t.pendingPieces.Contains(piece) {
955                 return PiecePriorityNormal
956         }
957         return PiecePriorityNone
958 }
959
960 func (t *Torrent) pendPiece(piece int) {
961         if t.pendingPieces.Contains(piece) {
962                 return
963         }
964         if t.havePiece(piece) {
965                 return
966         }
967         t.pendingPieces.Add(piece)
968         t.updatePiecePriority(piece)
969 }
970
971 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
972         t.pendingPieces.Sub(unpend)
973         unpend.IterTyped(func(piece int) (again bool) {
974                 t.updatePiecePriority(piece)
975                 return true
976         })
977 }
978
979 func (t *Torrent) pendPieceRange(begin, end int) {
980         for i := begin; i < end; i++ {
981                 t.pendPiece(i)
982         }
983 }
984
985 func (t *Torrent) unpendPieceRange(begin, end int) {
986         var bm bitmap.Bitmap
987         bm.AddRange(begin, end)
988         t.unpendPieces(bm)
989 }
990
991 func (t *Torrent) pendRequest(req request) {
992         ci := chunkIndex(req.chunkSpec, t.chunkSize)
993         t.pieces[req.Index].pendChunkIndex(ci)
994 }
995
996 func (t *Torrent) pieceCompletionChanged(piece int) {
997         t.cl.event.Broadcast()
998         if t.pieceComplete(piece) {
999                 t.onPieceCompleted(piece)
1000         } else {
1001                 t.onIncompletePiece(piece)
1002         }
1003         t.updatePiecePriority(piece)
1004 }
1005
1006 func (t *Torrent) openNewConns() {
1007         t.cl.openNewConns(t)
1008 }
1009
1010 func (t *Torrent) getConnPieceInclination() []int {
1011         _ret := t.connPieceInclinationPool.Get()
1012         if _ret == nil {
1013                 pieceInclinationsNew.Add(1)
1014                 return rand.Perm(t.numPieces())
1015         }
1016         pieceInclinationsReused.Add(1)
1017         return *_ret.(*[]int)
1018 }
1019
1020 func (t *Torrent) putPieceInclination(pi []int) {
1021         t.connPieceInclinationPool.Put(&pi)
1022         pieceInclinationsPut.Add(1)
1023 }
1024
1025 func (t *Torrent) updatePieceCompletion(piece int) {
1026         pcu := t.pieceCompleteUncached(piece)
1027         p := &t.pieces[piece]
1028         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1029         p.storageCompletionOk = pcu.Ok
1030         t.completedPieces.Set(piece, pcu.Complete)
1031         if changed {
1032                 t.pieceCompletionChanged(piece)
1033         }
1034 }
1035
1036 // Non-blocking read. Client lock is not required.
1037 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1038         p := &t.pieces[off/t.info.PieceLength]
1039         p.waitNoPendingWrites()
1040         return p.Storage().ReadAt(b, off-p.Info().Offset())
1041 }
1042
1043 func (t *Torrent) updateAllPieceCompletions() {
1044         for i := range iter.N(t.numPieces()) {
1045                 t.updatePieceCompletion(i)
1046         }
1047 }
1048
1049 // Returns an error if the metadata was completed, but couldn't be set for
1050 // some reason. Blame it on the last peer to contribute.
1051 func (t *Torrent) maybeCompleteMetadata() error {
1052         if t.haveInfo() {
1053                 // Nothing to do.
1054                 return nil
1055         }
1056         if !t.haveAllMetadataPieces() {
1057                 // Don't have enough metadata pieces.
1058                 return nil
1059         }
1060         err := t.setInfoBytes(t.metadataBytes)
1061         if err != nil {
1062                 t.invalidateMetadata()
1063                 return fmt.Errorf("error setting info bytes: %s", err)
1064         }
1065         if t.cl.config.Debug {
1066                 log.Printf("%s: got metadata from peers", t)
1067         }
1068         return nil
1069 }
1070
1071 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1072         t.forReaderOffsetPieces(func(begin, end int) bool {
1073                 ret.AddRange(begin, end)
1074                 return true
1075         })
1076         return
1077 }
1078
1079 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1080         t.forReaderOffsetPieces(func(begin, end int) bool {
1081                 if end > begin {
1082                         now.Add(begin)
1083                         readahead.AddRange(begin+1, end)
1084                 }
1085                 return true
1086         })
1087         return
1088 }
1089
1090 func (t *Torrent) needData() bool {
1091         if t.closed.IsSet() {
1092                 return false
1093         }
1094         if !t.haveInfo() {
1095                 return true
1096         }
1097         if t.pendingPieces.Len() != 0 {
1098                 return true
1099         }
1100         // Read as "not all complete".
1101         return !t.readerPieces().IterTyped(func(piece int) bool {
1102                 return t.pieceComplete(piece)
1103         })
1104 }
1105
1106 func appendMissingStrings(old, new []string) (ret []string) {
1107         ret = old
1108 new:
1109         for _, n := range new {
1110                 for _, o := range old {
1111                         if o == n {
1112                                 continue new
1113                         }
1114                 }
1115                 ret = append(ret, n)
1116         }
1117         return
1118 }
1119
1120 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1121         ret = existing
1122         for minNumTiers > len(ret) {
1123                 ret = append(ret, nil)
1124         }
1125         return
1126 }
1127
1128 func (t *Torrent) addTrackers(announceList [][]string) {
1129         fullAnnounceList := &t.metainfo.AnnounceList
1130         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1131         for tierIndex, trackerURLs := range announceList {
1132                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1133         }
1134         t.startMissingTrackerScrapers()
1135         t.updateWantPeersEvent()
1136 }
1137
1138 // Don't call this before the info is available.
1139 func (t *Torrent) bytesCompleted() int64 {
1140         if !t.haveInfo() {
1141                 return 0
1142         }
1143         return t.info.TotalLength() - t.bytesLeft()
1144 }
1145
1146 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1147         t.cl.mu.Lock()
1148         defer t.cl.mu.Unlock()
1149         return t.setInfoBytes(b)
1150 }
1151
1152 // Returns true if connection is removed from torrent.Conns.
1153 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1154         _, ret = t.conns[c]
1155         delete(t.conns, c)
1156         return
1157 }
1158
1159 func (t *Torrent) dropConnection(c *connection) {
1160         t.cl.event.Broadcast()
1161         c.Close()
1162         if t.deleteConnection(c) {
1163                 t.openNewConns()
1164         }
1165 }
1166
1167 func (t *Torrent) wantPeers() bool {
1168         if t.closed.IsSet() {
1169                 return false
1170         }
1171         if len(t.peers) > t.cl.config.TorrentPeersLowWater {
1172                 return false
1173         }
1174         return t.needData() || t.seeding()
1175 }
1176
1177 func (t *Torrent) updateWantPeersEvent() {
1178         if t.wantPeers() {
1179                 t.wantPeersEvent.Set()
1180         } else {
1181                 t.wantPeersEvent.Clear()
1182         }
1183 }
1184
1185 // Returns whether the client should make effort to seed the torrent.
1186 func (t *Torrent) seeding() bool {
1187         cl := t.cl
1188         if t.closed.IsSet() {
1189                 return false
1190         }
1191         if cl.config.NoUpload {
1192                 return false
1193         }
1194         if !cl.config.Seed {
1195                 return false
1196         }
1197         if cl.config.DisableAggressiveUpload && t.needData() {
1198                 return false
1199         }
1200         return true
1201 }
1202
1203 func (t *Torrent) startScrapingTracker(url string) {
1204         if url == "" {
1205                 return
1206         }
1207         if _, ok := t.trackerAnnouncers[url]; ok {
1208                 return
1209         }
1210         newAnnouncer := &trackerScraper{
1211                 url: url,
1212                 t:   t,
1213         }
1214         if t.trackerAnnouncers == nil {
1215                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1216         }
1217         t.trackerAnnouncers[url] = newAnnouncer
1218         go newAnnouncer.Run()
1219 }
1220
1221 // Adds and starts tracker scrapers for tracker URLs that aren't already
1222 // running.
1223 func (t *Torrent) startMissingTrackerScrapers() {
1224         if t.cl.config.DisableTrackers {
1225                 return
1226         }
1227         t.startScrapingTracker(t.metainfo.Announce)
1228         for _, tier := range t.metainfo.AnnounceList {
1229                 for _, url := range tier {
1230                         t.startScrapingTracker(url)
1231                 }
1232         }
1233 }
1234
1235 // Returns an AnnounceRequest with fields filled out to defaults and current
1236 // values.
1237 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1238         return tracker.AnnounceRequest{
1239                 Event:    tracker.None,
1240                 NumWant:  -1,
1241                 Port:     uint16(t.cl.incomingPeerPort()),
1242                 PeerId:   t.cl.peerID,
1243                 InfoHash: t.infoHash,
1244                 Left:     t.bytesLeftAnnounce(),
1245         }
1246 }
1247
1248 // Adds peers revealed in an announce until the announce ends, or we have
1249 // enough peers.
1250 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1251         cl := t.cl
1252         // Count all the unique addresses we got during this announce.
1253         allAddrs := make(map[string]struct{})
1254         for {
1255                 select {
1256                 case v, ok := <-pvs:
1257                         if !ok {
1258                                 return
1259                         }
1260                         addPeers := make([]Peer, 0, len(v.Peers))
1261                         for _, cp := range v.Peers {
1262                                 if cp.Port == 0 {
1263                                         // Can't do anything with this.
1264                                         continue
1265                                 }
1266                                 addPeers = append(addPeers, Peer{
1267                                         IP:     cp.IP[:],
1268                                         Port:   cp.Port,
1269                                         Source: peerSourceDHTGetPeers,
1270                                 })
1271                                 key := (&net.UDPAddr{
1272                                         IP:   cp.IP[:],
1273                                         Port: cp.Port,
1274                                 }).String()
1275                                 allAddrs[key] = struct{}{}
1276                         }
1277                         cl.mu.Lock()
1278                         t.addPeers(addPeers)
1279                         numPeers := len(t.peers)
1280                         cl.mu.Unlock()
1281                         if numPeers >= cl.config.TorrentPeersHighWater {
1282                                 return
1283                         }
1284                 case <-t.closed.LockedChan(&cl.mu):
1285                         return
1286                 }
1287         }
1288 }
1289
1290 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1291         cl := t.cl
1292         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1293         if err != nil {
1294                 return
1295         }
1296         t.consumeDHTAnnounce(ps.Peers)
1297         ps.Close()
1298         return
1299 }
1300
1301 func (t *Torrent) dhtAnnouncer() {
1302         cl := t.cl
1303         for {
1304                 select {
1305                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1306                 case <-t.closed.LockedChan(&cl.mu):
1307                         return
1308                 }
1309                 err := t.announceDHT(true)
1310                 func() {
1311                         cl.mu.Lock()
1312                         defer cl.mu.Unlock()
1313                         if err == nil {
1314                                 t.numDHTAnnounces++
1315                         } else {
1316                                 log.Printf("error announcing %q to DHT: %s", t, err)
1317                         }
1318                 }()
1319                 select {
1320                 case <-t.closed.LockedChan(&cl.mu):
1321                         return
1322                 case <-time.After(5 * time.Minute):
1323                 }
1324         }
1325 }
1326
1327 func (t *Torrent) addPeers(peers []Peer) {
1328         for _, p := range peers {
1329                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1330                         continue
1331                 }
1332                 t.addPeer(p)
1333         }
1334 }
1335
1336 func (t *Torrent) Stats() TorrentStats {
1337         t.cl.mu.Lock()
1338         defer t.cl.mu.Unlock()
1339
1340         t.stats.ActivePeers = len(t.conns)
1341         t.stats.HalfOpenPeers = len(t.halfOpen)
1342         t.stats.PendingPeers = len(t.peers)
1343         t.stats.TotalPeers = t.numTotalPeers()
1344
1345         return t.stats
1346 }
1347
1348 // The total number of peers in the torrent.
1349 func (t *Torrent) numTotalPeers() int {
1350         peers := make(map[string]struct{})
1351         for conn := range t.conns {
1352                 ra := conn.conn.RemoteAddr()
1353                 if ra == nil {
1354                         // It's been closed and doesn't support RemoteAddr.
1355                         continue
1356                 }
1357                 peers[ra.String()] = struct{}{}
1358         }
1359         for addr := range t.halfOpen {
1360                 peers[addr] = struct{}{}
1361         }
1362         for _, peer := range t.peers {
1363                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1364         }
1365         return len(peers)
1366 }
1367
1368 // Returns true if the connection is added.
1369 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1370         if t.cl.closed.IsSet() {
1371                 return false
1372         }
1373         if !t.wantConns() {
1374                 return false
1375         }
1376         for c0 := range t.conns {
1377                 if c.PeerID == c0.PeerID {
1378                         // Already connected to a client with that ID.
1379                         duplicateClientConns.Add(1)
1380                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1381                         // Retain the connection from initiated from lower peer ID to
1382                         // higher.
1383                         if outgoing == lower {
1384                                 // Close the other one.
1385                                 c0.Close()
1386                                 // TODO: Is it safe to delete from the map while we're
1387                                 // iterating over it?
1388                                 t.deleteConnection(c0)
1389                         } else {
1390                                 // Abandon this one.
1391                                 return false
1392                         }
1393                 }
1394         }
1395         if len(t.conns) >= t.maxEstablishedConns {
1396                 c := t.worstBadConn()
1397                 if c == nil {
1398                         return false
1399                 }
1400                 if t.cl.config.Debug && missinggo.CryHeard() {
1401                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1402                 }
1403                 c.Close()
1404                 t.deleteConnection(c)
1405         }
1406         if len(t.conns) >= t.maxEstablishedConns {
1407                 panic(len(t.conns))
1408         }
1409         if c.t != nil {
1410                 panic("connection already associated with a torrent")
1411         }
1412         // Reconcile bytes transferred before connection was associated with a
1413         // torrent.
1414         t.stats.wroteBytes(c.stats.BytesWritten)
1415         t.stats.readBytes(c.stats.BytesRead)
1416         c.t = t
1417         t.conns[c] = struct{}{}
1418         return true
1419 }
1420
1421 func (t *Torrent) wantConns() bool {
1422         if !t.networkingEnabled {
1423                 return false
1424         }
1425         if t.closed.IsSet() {
1426                 return false
1427         }
1428         if !t.seeding() && !t.needData() {
1429                 return false
1430         }
1431         if len(t.conns) < t.maxEstablishedConns {
1432                 return true
1433         }
1434         return t.worstBadConn() != nil
1435 }
1436
1437 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1438         t.cl.mu.Lock()
1439         defer t.cl.mu.Unlock()
1440         oldMax = t.maxEstablishedConns
1441         t.maxEstablishedConns = max
1442         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1443         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1444                 t.dropConnection(wcs.Pop().(*connection))
1445         }
1446         t.openNewConns()
1447         return oldMax
1448 }
1449
1450 func (t *Torrent) mu() missinggo.RWLocker {
1451         return &t.cl.mu
1452 }
1453
1454 func (t *Torrent) pieceHashed(piece int, correct bool) {
1455         if t.closed.IsSet() {
1456                 return
1457         }
1458         p := &t.pieces[piece]
1459         touchers := t.reapPieceTouchers(piece)
1460         if p.everHashed {
1461                 // Don't score the first time a piece is hashed, it could be an
1462                 // initial check.
1463                 if correct {
1464                         pieceHashedCorrect.Add(1)
1465                 } else {
1466                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1467                         pieceHashedNotCorrect.Add(1)
1468                 }
1469         }
1470         p.everHashed = true
1471         if correct {
1472                 for _, c := range touchers {
1473                         c.goodPiecesDirtied++
1474                 }
1475                 err := p.Storage().MarkComplete()
1476                 if err != nil {
1477                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1478                 }
1479                 t.updatePieceCompletion(piece)
1480         } else {
1481                 if len(touchers) != 0 {
1482                         for _, c := range touchers {
1483                                 // Y u do dis peer?!
1484                                 c.badPiecesDirtied++
1485                         }
1486                         slices.Sort(touchers, connLessTrusted)
1487                         if t.cl.config.Debug {
1488                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1489                                         for _, c := range touchers {
1490                                                 ret = append(ret, c.netGoodPiecesDirtied())
1491                                         }
1492                                         return
1493                                 }())
1494                         }
1495                         c := touchers[0]
1496                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1497                         c.Drop()
1498                 }
1499                 t.onIncompletePiece(piece)
1500         }
1501 }
1502
1503 func (t *Torrent) cancelRequestsForPiece(piece int) {
1504         // TODO: Make faster
1505         for cn := range t.conns {
1506                 cn.tickleWriter()
1507         }
1508 }
1509
1510 func (t *Torrent) onPieceCompleted(piece int) {
1511         t.pendingPieces.Remove(piece)
1512         t.pendAllChunkSpecs(piece)
1513         t.cancelRequestsForPiece(piece)
1514         for conn := range t.conns {
1515                 conn.Have(piece)
1516         }
1517 }
1518
1519 func (t *Torrent) onIncompletePiece(piece int) {
1520         if t.pieceAllDirty(piece) {
1521                 t.pendAllChunkSpecs(piece)
1522         }
1523         if !t.wantPieceIndex(piece) {
1524                 return
1525         }
1526         // We could drop any connections that we told we have a piece that we
1527         // don't here. But there's a test failure, and it seems clients don't care
1528         // if you request pieces that you already claim to have. Pruning bad
1529         // connections might just remove any connections that aren't treating us
1530         // favourably anyway.
1531
1532         // for c := range t.conns {
1533         //      if c.sentHave(piece) {
1534         //              c.Drop()
1535         //      }
1536         // }
1537         for conn := range t.conns {
1538                 if conn.PeerHasPiece(piece) {
1539                         conn.updateRequests()
1540                 }
1541         }
1542 }
1543
1544 func (t *Torrent) verifyPiece(piece int) {
1545         cl := t.cl
1546         cl.mu.Lock()
1547         defer cl.mu.Unlock()
1548         p := &t.pieces[piece]
1549         defer func() {
1550                 p.numVerifies++
1551                 cl.event.Broadcast()
1552         }()
1553         for p.hashing || t.storage == nil {
1554                 cl.event.Wait()
1555         }
1556         if !p.t.piecesQueuedForHash.Remove(piece) {
1557                 panic("piece was not queued")
1558         }
1559         if t.closed.IsSet() || t.pieceComplete(piece) {
1560                 t.updatePiecePriority(piece)
1561                 return
1562         }
1563         p.hashing = true
1564         t.publishPieceChange(piece)
1565         t.storageLock.RLock()
1566         cl.mu.Unlock()
1567         sum := t.hashPiece(piece)
1568         t.storageLock.RUnlock()
1569         cl.mu.Lock()
1570         p.hashing = false
1571         t.pieceHashed(piece, sum == p.hash)
1572         t.publishPieceChange(piece)
1573 }
1574
1575 // Return the connections that touched a piece, and clear the entry while
1576 // doing it.
1577 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1578         for c := range t.conns {
1579                 if _, ok := c.peerTouchedPieces[piece]; ok {
1580                         ret = append(ret, c)
1581                         delete(c.peerTouchedPieces, piece)
1582                 }
1583         }
1584         return
1585 }
1586
1587 func (t *Torrent) connsAsSlice() (ret []*connection) {
1588         for c := range t.conns {
1589                 ret = append(ret, c)
1590         }
1591         return
1592 }
1593
1594 // Currently doesn't really queue, but should in the future.
1595 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1596         piece := &t.pieces[pieceIndex]
1597         if piece.queuedForHash() {
1598                 return
1599         }
1600         t.piecesQueuedForHash.Add(pieceIndex)
1601         t.publishPieceChange(pieceIndex)
1602         go t.verifyPiece(pieceIndex)
1603 }
1604
1605 func (t *Torrent) VerifyData() {
1606         for i := range iter.N(t.NumPieces()) {
1607                 t.Piece(i).VerifyData()
1608         }
1609 }