]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
301e2d6d67c4c8406f099add71465e47a48832dd
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/bradfitz/iter"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         networkingEnabled bool
48         requestStrategy   int
49
50         closed   missinggo.Event
51         infoHash metainfo.Hash
52         pieces   []Piece
53         // Values are the piece indices that changed.
54         pieceStateChanges *pubsub.PubSub
55         // The size of chunks to request from peers over the wire. This is
56         // normally 16KiB by convention these days.
57         chunkSize pp.Integer
58         chunkPool *sync.Pool
59         // Total length of the torrent in bytes. Stored because it's not O(1) to
60         // get this from the info dict.
61         length *int64
62
63         // The storage to open when the info dict becomes available.
64         storageOpener *storage.Client
65         // Storage for torrent data.
66         storage *storage.Torrent
67         // Read-locked for using storage, and write-locked for Closing.
68         storageLock sync.RWMutex
69
70         metainfo metainfo.MetaInfo
71
72         // The info dict. nil if we don't have it (yet).
73         info *metainfo.Info
74
75         // Active peer connections, running message stream loops.
76         conns               map[*connection]struct{}
77         maxEstablishedConns int
78         // Set of addrs to which we're attempting to connect. Connections are
79         // half-open until all handshakes are completed.
80         halfOpen    map[string]Peer
81         fastestConn *connection
82
83         // Reserve of peers to connect to. A peer can be both here and in the
84         // active connections if were told about the peer after connecting with
85         // them. That encourages us to reconnect to peers that are well known in
86         // the swarm.
87         peers          map[peersKey]Peer
88         wantPeersEvent missinggo.Event
89         // An announcer for each tracker URL.
90         trackerAnnouncers map[string]*trackerScraper
91         // How many times we've initiated a DHT announce. TODO: Move into stats.
92         numDHTAnnounces int
93
94         // Name used if the info name isn't available. Should be cleared when the
95         // Info does become available.
96         displayName string
97
98         // The bencoded bytes of the info dict. This is actively manipulated if
99         // the info bytes aren't initially available, and we try to fetch them
100         // from peers.
101         metadataBytes []byte
102         // Each element corresponds to the 16KiB metadata pieces. If true, we have
103         // received that piece.
104         metadataCompletedChunks []bool
105         metadataChanged         sync.Cond
106
107         // Set when .Info is obtained.
108         gotMetainfo missinggo.Event
109
110         readers               map[*reader]struct{}
111         readerNowPieces       bitmap.Bitmap
112         readerReadaheadPieces bitmap.Bitmap
113
114         // The indexes of pieces we want with normal priority, that aren't
115         // currently available.
116         pendingPieces bitmap.Bitmap
117         // A cache of completed piece indices.
118         completedPieces bitmap.Bitmap
119         // Pieces that need to be hashed.
120         piecesQueuedForHash bitmap.Bitmap
121
122         // A pool of piece priorities []int for assignment to new connections.
123         // These "inclinations" are used to give connections preference for
124         // different pieces.
125         connPieceInclinationPool sync.Pool
126         // Torrent-level statistics.
127         stats TorrentStats
128 }
129
130 // Returns a channel that is closed when the Torrent is closed.
131 func (t *Torrent) Closed() <-chan struct{} {
132         return t.closed.LockedChan(&t.cl.mu)
133 }
134
135 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
136 // pending, and half-open peers.
137 func (t *Torrent) KnownSwarm() (ks []Peer) {
138         // Add pending peers to the list
139         for _, peer := range t.peers {
140                 ks = append(ks, peer)
141         }
142
143         // Add half-open peers to the list
144         for _, peer := range t.halfOpen {
145                 ks = append(ks, peer)
146         }
147
148         // Add active peers to the list
149         for conn := range t.conns {
150                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
151                 if err != nil {
152                         panic(err)
153                 }
154
155                 ip := net.ParseIP(host)
156                 port, err := strconv.Atoi(portString)
157                 if err != nil {
158                         panic(err)
159                 }
160
161                 ks = append(ks, Peer{
162                         Id:     conn.PeerID,
163                         IP:     ip,
164                         Port:   port,
165                         Source: conn.Discovery,
166                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
167                         // > But if we're not connected to them with an encrypted connection, I couldn't say
168                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
169                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
170                         // > connection. It's probably easiest to do the latter for now.
171                         // https://github.com/anacrolix/torrent/pull/188
172                         SupportsEncryption: conn.headerEncrypted,
173                 })
174         }
175
176         return
177 }
178
179 func (t *Torrent) setChunkSize(size pp.Integer) {
180         t.chunkSize = size
181         t.chunkPool = &sync.Pool{
182                 New: func() interface{} {
183                         b := make([]byte, size)
184                         return &b
185                 },
186         }
187 }
188
189 func (t *Torrent) setDisplayName(dn string) {
190         if t.haveInfo() {
191                 return
192         }
193         t.displayName = dn
194 }
195
196 func (t *Torrent) pieceComplete(piece int) bool {
197         return t.completedPieces.Get(piece)
198 }
199
200 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
201         return t.pieces[piece].Storage().Completion()
202 }
203
204 // There's a connection to that address already.
205 func (t *Torrent) addrActive(addr string) bool {
206         if _, ok := t.halfOpen[addr]; ok {
207                 return true
208         }
209         for c := range t.conns {
210                 ra := c.remoteAddr()
211                 if ra == nil {
212                         continue
213                 }
214                 if ra.String() == addr {
215                         return true
216                 }
217         }
218         return false
219 }
220
221 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
222         ret = make([]*connection, 0, len(t.conns))
223         for c := range t.conns {
224                 if !c.closed.IsSet() {
225                         ret = append(ret, c)
226                 }
227         }
228         return
229 }
230
231 func (t *Torrent) addPeer(p Peer) {
232         cl := t.cl
233         cl.openNewConns(t)
234         if len(t.peers) >= cl.config.TorrentPeersHighWater {
235                 return
236         }
237         key := peersKey{string(p.IP), p.Port}
238         if _, ok := t.peers[key]; ok {
239                 return
240         }
241         t.peers[key] = p
242         peersAddedBySource.Add(string(p.Source), 1)
243         cl.openNewConns(t)
244
245 }
246
247 func (t *Torrent) invalidateMetadata() {
248         for i := range t.metadataCompletedChunks {
249                 t.metadataCompletedChunks[i] = false
250         }
251         t.info = nil
252 }
253
254 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
255         if t.haveInfo() {
256                 return
257         }
258         if index >= len(t.metadataCompletedChunks) {
259                 log.Printf("%s: ignoring metadata piece %d", t, index)
260                 return
261         }
262         copy(t.metadataBytes[(1<<14)*index:], data)
263         t.metadataCompletedChunks[index] = true
264 }
265
266 func (t *Torrent) metadataPieceCount() int {
267         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
268 }
269
270 func (t *Torrent) haveMetadataPiece(piece int) bool {
271         if t.haveInfo() {
272                 return (1<<14)*piece < len(t.metadataBytes)
273         } else {
274                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
275         }
276 }
277
278 func (t *Torrent) metadataSizeKnown() bool {
279         return t.metadataBytes != nil
280 }
281
282 func (t *Torrent) metadataSize() int {
283         return len(t.metadataBytes)
284 }
285
286 func infoPieceHashes(info *metainfo.Info) (ret []string) {
287         for i := 0; i < len(info.Pieces); i += sha1.Size {
288                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
289         }
290         return
291 }
292
293 func (t *Torrent) makePieces() {
294         hashes := infoPieceHashes(t.info)
295         t.pieces = make([]Piece, len(hashes))
296         for i, hash := range hashes {
297                 piece := &t.pieces[i]
298                 piece.t = t
299                 piece.index = i
300                 piece.noPendingWrites.L = &piece.pendingWritesMutex
301                 missinggo.CopyExact(piece.hash[:], hash)
302         }
303 }
304
305 // Called when metadata for a torrent becomes available.
306 func (t *Torrent) setInfoBytes(b []byte) error {
307         if t.haveInfo() {
308                 return nil
309         }
310         if metainfo.HashBytes(b) != t.infoHash {
311                 return errors.New("info bytes have wrong hash")
312         }
313         var info metainfo.Info
314         err := bencode.Unmarshal(b, &info)
315         if err != nil {
316                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
317         }
318         err = validateInfo(&info)
319         if err != nil {
320                 return fmt.Errorf("bad info: %s", err)
321         }
322         defer t.updateWantPeersEvent()
323         t.info = &info
324         t.displayName = "" // Save a few bytes lol.
325         t.cl.event.Broadcast()
326         t.gotMetainfo.Set()
327         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
328         if err != nil {
329                 return fmt.Errorf("error opening torrent storage: %s", err)
330         }
331         var l int64
332         for _, f := range t.info.UpvertedFiles() {
333                 l += f.Length
334         }
335         t.length = &l
336         t.metadataBytes = b
337         t.metadataCompletedChunks = nil
338         t.makePieces()
339         for conn := range t.conns {
340                 if err := conn.setNumPieces(t.numPieces()); err != nil {
341                         log.Printf("closing connection: %s", err)
342                         conn.Close()
343                 }
344         }
345         for i := range t.pieces {
346                 t.updatePieceCompletion(i)
347                 p := &t.pieces[i]
348                 if !p.storageCompletionOk {
349                         // log.Printf("piece %s completion unknown, queueing check", p)
350                         t.queuePieceCheck(i)
351                 }
352         }
353         return nil
354 }
355
356 func (t *Torrent) haveAllMetadataPieces() bool {
357         if t.haveInfo() {
358                 return true
359         }
360         if t.metadataCompletedChunks == nil {
361                 return false
362         }
363         for _, have := range t.metadataCompletedChunks {
364                 if !have {
365                         return false
366                 }
367         }
368         return true
369 }
370
371 // TODO: Propagate errors to disconnect peer.
372 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
373         if t.haveInfo() {
374                 // We already know the correct metadata size.
375                 return
376         }
377         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
378                 return errors.New("bad size")
379         }
380         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
381                 return
382         }
383         t.metadataBytes = make([]byte, bytes)
384         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
385         t.metadataChanged.Broadcast()
386         for c := range t.conns {
387                 c.requestPendingMetadata()
388         }
389         return
390 }
391
392 // The current working name for the torrent. Either the name in the info dict,
393 // or a display name given such as by the dn value in a magnet link, or "".
394 func (t *Torrent) name() string {
395         if t.haveInfo() {
396                 return t.info.Name
397         }
398         return t.displayName
399 }
400
401 func (t *Torrent) pieceState(index int) (ret PieceState) {
402         p := &t.pieces[index]
403         ret.Priority = t.piecePriority(index)
404         if t.pieceComplete(index) {
405                 ret.Complete = true
406         }
407         if p.queuedForHash() || p.hashing {
408                 ret.Checking = true
409         }
410         if !ret.Complete && t.piecePartiallyDownloaded(index) {
411                 ret.Partial = true
412         }
413         return
414 }
415
416 func (t *Torrent) metadataPieceSize(piece int) int {
417         return metadataPieceSize(len(t.metadataBytes), piece)
418 }
419
420 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
421         d := map[string]int{
422                 "msg_type": msgType,
423                 "piece":    piece,
424         }
425         if data != nil {
426                 d["total_size"] = len(t.metadataBytes)
427         }
428         p, err := bencode.Marshal(d)
429         if err != nil {
430                 panic(err)
431         }
432         return pp.Message{
433                 Type:            pp.Extended,
434                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
435                 ExtendedPayload: append(p, data...),
436         }
437 }
438
439 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
440         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
441                 ret = append(ret, PieceStateRun{
442                         PieceState: el.(PieceState),
443                         Length:     int(count),
444                 })
445         })
446         for index := range t.pieces {
447                 rle.Append(t.pieceState(index), 1)
448         }
449         rle.Flush()
450         return
451 }
452
453 // Produces a small string representing a PieceStateRun.
454 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
455         ret = fmt.Sprintf("%d", psr.Length)
456         ret += func() string {
457                 switch psr.Priority {
458                 case PiecePriorityNext:
459                         return "N"
460                 case PiecePriorityNormal:
461                         return "."
462                 case PiecePriorityReadahead:
463                         return "R"
464                 case PiecePriorityNow:
465                         return "!"
466                 default:
467                         return ""
468                 }
469         }()
470         if psr.Checking {
471                 ret += "H"
472         }
473         if psr.Partial {
474                 ret += "P"
475         }
476         if psr.Complete {
477                 ret += "C"
478         }
479         return
480 }
481
482 func (t *Torrent) writeStatus(w io.Writer) {
483         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
484         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
485         if !t.haveInfo() {
486                 fmt.Fprintf(w, "Metadata have: ")
487                 for _, h := range t.metadataCompletedChunks {
488                         fmt.Fprintf(w, "%c", func() rune {
489                                 if h {
490                                         return 'H'
491                                 } else {
492                                         return '.'
493                                 }
494                         }())
495                 }
496                 fmt.Fprintln(w)
497         }
498         fmt.Fprintf(w, "Piece length: %s\n", func() string {
499                 if t.haveInfo() {
500                         return fmt.Sprint(t.usualPieceSize())
501                 } else {
502                         return "?"
503                 }
504         }())
505         if t.info != nil {
506                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
507                 fmt.Fprint(w, "Piece States:")
508                 for _, psr := range t.pieceStateRuns() {
509                         w.Write([]byte(" "))
510                         w.Write([]byte(pieceStateRunStatusChars(psr)))
511                 }
512                 fmt.Fprintln(w)
513         }
514         fmt.Fprintf(w, "Reader Pieces:")
515         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
516                 fmt.Fprintf(w, " %d:%d", begin, end)
517                 return true
518         })
519         fmt.Fprintln(w)
520
521         fmt.Fprintf(w, "Trackers:\n")
522         func() {
523                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
524                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
525                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
526                         return l.url < r.url
527                 }).([]*trackerScraper) {
528                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
529                 }
530                 tw.Flush()
531         }()
532
533         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
534
535         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
536         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
537         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
538         conns := t.connsAsSlice()
539         slices.Sort(conns, worseConn)
540         for i, c := range conns {
541                 fmt.Fprintf(w, "%2d. ", i+1)
542                 c.WriteStatus(w, t)
543         }
544 }
545
546 func (t *Torrent) haveInfo() bool {
547         return t.info != nil
548 }
549
550 // Returns a run-time generated MetaInfo that includes the info bytes and
551 // announce-list as currently known to the client.
552 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
553         return metainfo.MetaInfo{
554                 CreationDate: time.Now().Unix(),
555                 Comment:      "dynamic metainfo from client",
556                 CreatedBy:    "go.torrent",
557                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
558                 InfoBytes: func() []byte {
559                         if t.haveInfo() {
560                                 return t.metadataBytes
561                         } else {
562                                 return nil
563                         }
564                 }(),
565         }
566 }
567
568 func (t *Torrent) BytesMissing() int64 {
569         t.mu().RLock()
570         defer t.mu().RUnlock()
571         return t.bytesMissingLocked()
572 }
573
574 func (t *Torrent) bytesMissingLocked() int64 {
575         return t.bytesLeft()
576 }
577
578 func (t *Torrent) bytesLeft() (left int64) {
579         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
580                 p := &t.pieces[piece]
581                 left += int64(p.length() - p.numDirtyBytes())
582                 return true
583         })
584         return
585 }
586
587 // Bytes left to give in tracker announces.
588 func (t *Torrent) bytesLeftAnnounce() uint64 {
589         if t.haveInfo() {
590                 return uint64(t.bytesLeft())
591         } else {
592                 return math.MaxUint64
593         }
594 }
595
596 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
597         if t.pieceComplete(piece) {
598                 return false
599         }
600         if t.pieceAllDirty(piece) {
601                 return false
602         }
603         return t.pieces[piece].hasDirtyChunks()
604 }
605
606 func (t *Torrent) usualPieceSize() int {
607         return int(t.info.PieceLength)
608 }
609
610 func (t *Torrent) numPieces() int {
611         return t.info.NumPieces()
612 }
613
614 func (t *Torrent) numPiecesCompleted() (num int) {
615         return t.completedPieces.Len()
616 }
617
618 func (t *Torrent) close() (err error) {
619         t.closed.Set()
620         if t.storage != nil {
621                 t.storageLock.Lock()
622                 t.storage.Close()
623                 t.storageLock.Unlock()
624         }
625         for conn := range t.conns {
626                 conn.Close()
627         }
628         t.cl.event.Broadcast()
629         t.pieceStateChanges.Close()
630         t.updateWantPeersEvent()
631         return
632 }
633
634 func (t *Torrent) requestOffset(r request) int64 {
635         return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
636 }
637
638 // Return the request that would include the given offset into the torrent
639 // data. Returns !ok if there is no such request.
640 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
641         return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
642 }
643
644 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
645         tr := perf.NewTimer()
646
647         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
648         if err == nil && n != len(data) {
649                 err = io.ErrShortWrite
650         }
651         if err == nil {
652                 tr.Mark("write chunk")
653         }
654         return
655 }
656
657 func (t *Torrent) bitfield() (bf []bool) {
658         bf = make([]bool, t.numPieces())
659         t.completedPieces.IterTyped(func(piece int) (again bool) {
660                 bf[piece] = true
661                 return true
662         })
663         return
664 }
665
666 func (t *Torrent) pieceNumChunks(piece int) int {
667         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
668 }
669
670 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
671         t.pieces[pieceIndex].dirtyChunks.Clear()
672 }
673
674 type Peer struct {
675         Id     [20]byte
676         IP     net.IP
677         Port   int
678         Source peerSource
679         // Peer is known to support encryption.
680         SupportsEncryption bool
681 }
682
683 func (t *Torrent) pieceLength(piece int) pp.Integer {
684         if piece == t.numPieces()-1 {
685                 ret := pp.Integer(*t.length % t.info.PieceLength)
686                 if ret != 0 {
687                         return ret
688                 }
689         }
690         return pp.Integer(t.info.PieceLength)
691 }
692
693 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
694         hash := pieceHash.New()
695         p := &t.pieces[piece]
696         p.waitNoPendingWrites()
697         ip := t.info.Piece(piece)
698         pl := ip.Length()
699         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
700         if n == pl {
701                 missinggo.CopyExact(&ret, hash.Sum(nil))
702                 return
703         }
704         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
705                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
706         }
707         return
708 }
709
710 func (t *Torrent) haveAnyPieces() bool {
711         for i := range t.pieces {
712                 if t.pieceComplete(i) {
713                         return true
714                 }
715         }
716         return false
717 }
718
719 func (t *Torrent) havePiece(index int) bool {
720         return t.haveInfo() && t.pieceComplete(index)
721 }
722
723 func (t *Torrent) haveChunk(r request) (ret bool) {
724         // defer func() {
725         //      log.Println("have chunk", r, ret)
726         // }()
727         if !t.haveInfo() {
728                 return false
729         }
730         if t.pieceComplete(int(r.Index)) {
731                 return true
732         }
733         p := &t.pieces[r.Index]
734         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
735 }
736
737 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
738         return int(cs.Begin / chunkSize)
739 }
740
741 func (t *Torrent) wantPiece(r request) bool {
742         if !t.wantPieceIndex(int(r.Index)) {
743                 return false
744         }
745         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
746                 return true
747         }
748         // TODO: What about pieces that were wanted, but aren't now, and aren't
749         // completed either? That used to be done here.
750         return false
751 }
752
753 func (t *Torrent) wantPieceIndex(index int) bool {
754         if !t.haveInfo() {
755                 return false
756         }
757         if index < 0 || index >= t.numPieces() {
758                 return false
759         }
760         p := &t.pieces[index]
761         if p.queuedForHash() {
762                 return false
763         }
764         if p.hashing {
765                 return false
766         }
767         if t.pieceComplete(index) {
768                 return false
769         }
770         if t.pendingPieces.Contains(index) {
771                 return true
772         }
773         return !t.forReaderOffsetPieces(func(begin, end int) bool {
774                 return index < begin || index >= end
775         })
776 }
777
778 // The worst connection is one that hasn't been sent, or sent anything useful
779 // for the longest. A bad connection is one that usually sends us unwanted
780 // pieces, or has been in worser half of the established connections for more
781 // than a minute.
782 func (t *Torrent) worstBadConn() *connection {
783         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
784         heap.Init(&wcs)
785         for wcs.Len() != 0 {
786                 c := heap.Pop(&wcs).(*connection)
787                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
788                         return c
789                 }
790                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
791                         // Give connections 1 minute to prove themselves.
792                         if time.Since(c.completedHandshake) > time.Minute {
793                                 return c
794                         }
795                 }
796         }
797         return nil
798 }
799
800 type PieceStateChange struct {
801         Index int
802         PieceState
803 }
804
805 func (t *Torrent) publishPieceChange(piece int) {
806         cur := t.pieceState(piece)
807         p := &t.pieces[piece]
808         if cur != p.publicPieceState {
809                 p.publicPieceState = cur
810                 t.pieceStateChanges.Publish(PieceStateChange{
811                         piece,
812                         cur,
813                 })
814         }
815 }
816
817 func (t *Torrent) pieceNumPendingChunks(piece int) int {
818         if t.pieceComplete(piece) {
819                 return 0
820         }
821         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
822 }
823
824 func (t *Torrent) pieceAllDirty(piece int) bool {
825         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
826 }
827
828 func (t *Torrent) readersChanged() {
829         t.updateReaderPieces()
830         t.updateAllPiecePriorities()
831 }
832
833 func (t *Torrent) updateReaderPieces() {
834         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
835 }
836
837 func (t *Torrent) readerPosChanged(from, to pieceRange) {
838         if from == to {
839                 return
840         }
841         t.updateReaderPieces()
842         // Order the ranges, high and low.
843         l, h := from, to
844         if l.begin > h.begin {
845                 l, h = h, l
846         }
847         if l.end < h.begin {
848                 // Two distinct ranges.
849                 t.updatePiecePriorities(l.begin, l.end)
850                 t.updatePiecePriorities(h.begin, h.end)
851         } else {
852                 // Ranges overlap.
853                 end := l.end
854                 if h.end > end {
855                         end = h.end
856                 }
857                 t.updatePiecePriorities(l.begin, end)
858         }
859 }
860
861 func (t *Torrent) maybeNewConns() {
862         // Tickle the accept routine.
863         t.cl.event.Broadcast()
864         t.openNewConns()
865 }
866
867 func (t *Torrent) piecePriorityChanged(piece int) {
868         for c := range t.conns {
869                 if c.updatePiecePriority(piece) {
870                         c.updateRequests()
871                 }
872         }
873         t.maybeNewConns()
874         t.publishPieceChange(piece)
875 }
876
877 func (t *Torrent) updatePiecePriority(piece int) {
878         p := &t.pieces[piece]
879         newPrio := t.piecePriorityUncached(piece)
880         if newPrio == p.priority {
881                 return
882         }
883         p.priority = newPrio
884         t.piecePriorityChanged(piece)
885 }
886
887 func (t *Torrent) updateAllPiecePriorities() {
888         t.updatePiecePriorities(0, len(t.pieces))
889 }
890
891 // Update all piece priorities in one hit. This function should have the same
892 // output as updatePiecePriority, but across all pieces.
893 func (t *Torrent) updatePiecePriorities(begin, end int) {
894         for i := begin; i < end; i++ {
895                 t.updatePiecePriority(i)
896         }
897 }
898
899 // Returns the range of pieces [begin, end) that contains the extent of bytes.
900 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
901         if off >= *t.length {
902                 return
903         }
904         if off < 0 {
905                 size += off
906                 off = 0
907         }
908         if size <= 0 {
909                 return
910         }
911         begin = int(off / t.info.PieceLength)
912         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
913         if end > t.info.NumPieces() {
914                 end = t.info.NumPieces()
915         }
916         return
917 }
918
919 // Returns true if all iterations complete without breaking. Returns the read
920 // regions for all readers. The reader regions should not be merged as some
921 // callers depend on this method to enumerate readers.
922 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
923         for r := range t.readers {
924                 p := r.pieces
925                 if p.begin >= p.end {
926                         continue
927                 }
928                 if !f(p.begin, p.end) {
929                         return false
930                 }
931         }
932         return true
933 }
934
935 func (t *Torrent) piecePriority(piece int) piecePriority {
936         if !t.haveInfo() {
937                 return PiecePriorityNone
938         }
939         return t.pieces[piece].priority
940 }
941
942 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
943         if t.pieceComplete(piece) {
944                 return PiecePriorityNone
945         }
946         if t.readerNowPieces.Contains(piece) {
947                 return PiecePriorityNow
948         }
949         // if t.readerNowPieces.Contains(piece - 1) {
950         //      return PiecePriorityNext
951         // }
952         if t.readerReadaheadPieces.Contains(piece) {
953                 return PiecePriorityReadahead
954         }
955         if t.pendingPieces.Contains(piece) {
956                 return PiecePriorityNormal
957         }
958         return PiecePriorityNone
959 }
960
961 func (t *Torrent) pendPiece(piece int) {
962         if t.pendingPieces.Contains(piece) {
963                 return
964         }
965         if t.havePiece(piece) {
966                 return
967         }
968         t.pendingPieces.Add(piece)
969         t.updatePiecePriority(piece)
970 }
971
972 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
973         t.pendingPieces.Sub(unpend)
974         unpend.IterTyped(func(piece int) (again bool) {
975                 t.updatePiecePriority(piece)
976                 return true
977         })
978 }
979
980 func (t *Torrent) pendPieceRange(begin, end int) {
981         for i := begin; i < end; i++ {
982                 t.pendPiece(i)
983         }
984 }
985
986 func (t *Torrent) unpendPieceRange(begin, end int) {
987         var bm bitmap.Bitmap
988         bm.AddRange(begin, end)
989         t.unpendPieces(bm)
990 }
991
992 func (t *Torrent) pendRequest(req request) {
993         ci := chunkIndex(req.chunkSpec, t.chunkSize)
994         t.pieces[req.Index].pendChunkIndex(ci)
995 }
996
997 func (t *Torrent) pieceCompletionChanged(piece int) {
998         t.cl.event.Broadcast()
999         if t.pieceComplete(piece) {
1000                 t.onPieceCompleted(piece)
1001         } else {
1002                 t.onIncompletePiece(piece)
1003         }
1004         t.updatePiecePriority(piece)
1005 }
1006
1007 func (t *Torrent) openNewConns() {
1008         t.cl.openNewConns(t)
1009 }
1010
1011 func (t *Torrent) getConnPieceInclination() []int {
1012         _ret := t.connPieceInclinationPool.Get()
1013         if _ret == nil {
1014                 pieceInclinationsNew.Add(1)
1015                 return rand.Perm(t.numPieces())
1016         }
1017         pieceInclinationsReused.Add(1)
1018         return *_ret.(*[]int)
1019 }
1020
1021 func (t *Torrent) putPieceInclination(pi []int) {
1022         t.connPieceInclinationPool.Put(&pi)
1023         pieceInclinationsPut.Add(1)
1024 }
1025
1026 func (t *Torrent) updatePieceCompletion(piece int) {
1027         pcu := t.pieceCompleteUncached(piece)
1028         p := &t.pieces[piece]
1029         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1030         p.storageCompletionOk = pcu.Ok
1031         t.completedPieces.Set(piece, pcu.Complete)
1032         if changed {
1033                 t.pieceCompletionChanged(piece)
1034         }
1035 }
1036
1037 // Non-blocking read. Client lock is not required.
1038 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1039         p := &t.pieces[off/t.info.PieceLength]
1040         p.waitNoPendingWrites()
1041         return p.Storage().ReadAt(b, off-p.Info().Offset())
1042 }
1043
1044 func (t *Torrent) updateAllPieceCompletions() {
1045         for i := range iter.N(t.numPieces()) {
1046                 t.updatePieceCompletion(i)
1047         }
1048 }
1049
1050 // Returns an error if the metadata was completed, but couldn't be set for
1051 // some reason. Blame it on the last peer to contribute.
1052 func (t *Torrent) maybeCompleteMetadata() error {
1053         if t.haveInfo() {
1054                 // Nothing to do.
1055                 return nil
1056         }
1057         if !t.haveAllMetadataPieces() {
1058                 // Don't have enough metadata pieces.
1059                 return nil
1060         }
1061         err := t.setInfoBytes(t.metadataBytes)
1062         if err != nil {
1063                 t.invalidateMetadata()
1064                 return fmt.Errorf("error setting info bytes: %s", err)
1065         }
1066         if t.cl.config.Debug {
1067                 log.Printf("%s: got metadata from peers", t)
1068         }
1069         return nil
1070 }
1071
1072 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1073         t.forReaderOffsetPieces(func(begin, end int) bool {
1074                 ret.AddRange(begin, end)
1075                 return true
1076         })
1077         return
1078 }
1079
1080 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1081         t.forReaderOffsetPieces(func(begin, end int) bool {
1082                 if end > begin {
1083                         now.Add(begin)
1084                         readahead.AddRange(begin+1, end)
1085                 }
1086                 return true
1087         })
1088         return
1089 }
1090
1091 func (t *Torrent) needData() bool {
1092         if t.closed.IsSet() {
1093                 return false
1094         }
1095         if !t.haveInfo() {
1096                 return true
1097         }
1098         if t.pendingPieces.Len() != 0 {
1099                 return true
1100         }
1101         // Read as "not all complete".
1102         return !t.readerPieces().IterTyped(func(piece int) bool {
1103                 return t.pieceComplete(piece)
1104         })
1105 }
1106
1107 func appendMissingStrings(old, new []string) (ret []string) {
1108         ret = old
1109 new:
1110         for _, n := range new {
1111                 for _, o := range old {
1112                         if o == n {
1113                                 continue new
1114                         }
1115                 }
1116                 ret = append(ret, n)
1117         }
1118         return
1119 }
1120
1121 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1122         ret = existing
1123         for minNumTiers > len(ret) {
1124                 ret = append(ret, nil)
1125         }
1126         return
1127 }
1128
1129 func (t *Torrent) addTrackers(announceList [][]string) {
1130         fullAnnounceList := &t.metainfo.AnnounceList
1131         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1132         for tierIndex, trackerURLs := range announceList {
1133                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1134         }
1135         t.startMissingTrackerScrapers()
1136         t.updateWantPeersEvent()
1137 }
1138
1139 // Don't call this before the info is available.
1140 func (t *Torrent) bytesCompleted() int64 {
1141         if !t.haveInfo() {
1142                 return 0
1143         }
1144         return t.info.TotalLength() - t.bytesLeft()
1145 }
1146
1147 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1148         t.cl.mu.Lock()
1149         defer t.cl.mu.Unlock()
1150         return t.setInfoBytes(b)
1151 }
1152
1153 // Returns true if connection is removed from torrent.Conns.
1154 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1155         _, ret = t.conns[c]
1156         delete(t.conns, c)
1157         return
1158 }
1159
1160 func (t *Torrent) dropConnection(c *connection) {
1161         t.cl.event.Broadcast()
1162         c.Close()
1163         if t.deleteConnection(c) {
1164                 t.openNewConns()
1165         }
1166 }
1167
1168 func (t *Torrent) wantPeers() bool {
1169         if t.closed.IsSet() {
1170                 return false
1171         }
1172         if len(t.peers) > t.cl.config.TorrentPeersLowWater {
1173                 return false
1174         }
1175         return t.needData() || t.seeding()
1176 }
1177
1178 func (t *Torrent) updateWantPeersEvent() {
1179         if t.wantPeers() {
1180                 t.wantPeersEvent.Set()
1181         } else {
1182                 t.wantPeersEvent.Clear()
1183         }
1184 }
1185
1186 // Returns whether the client should make effort to seed the torrent.
1187 func (t *Torrent) seeding() bool {
1188         cl := t.cl
1189         if t.closed.IsSet() {
1190                 return false
1191         }
1192         if cl.config.NoUpload {
1193                 return false
1194         }
1195         if !cl.config.Seed {
1196                 return false
1197         }
1198         if cl.config.DisableAggressiveUpload && t.needData() {
1199                 return false
1200         }
1201         return true
1202 }
1203
1204 func (t *Torrent) startScrapingTracker(url string) {
1205         if url == "" {
1206                 return
1207         }
1208         if _, ok := t.trackerAnnouncers[url]; ok {
1209                 return
1210         }
1211         newAnnouncer := &trackerScraper{
1212                 url: url,
1213                 t:   t,
1214         }
1215         if t.trackerAnnouncers == nil {
1216                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1217         }
1218         t.trackerAnnouncers[url] = newAnnouncer
1219         go newAnnouncer.Run()
1220 }
1221
1222 // Adds and starts tracker scrapers for tracker URLs that aren't already
1223 // running.
1224 func (t *Torrent) startMissingTrackerScrapers() {
1225         if t.cl.config.DisableTrackers {
1226                 return
1227         }
1228         t.startScrapingTracker(t.metainfo.Announce)
1229         for _, tier := range t.metainfo.AnnounceList {
1230                 for _, url := range tier {
1231                         t.startScrapingTracker(url)
1232                 }
1233         }
1234 }
1235
1236 // Returns an AnnounceRequest with fields filled out to defaults and current
1237 // values.
1238 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1239         return tracker.AnnounceRequest{
1240                 Event:    tracker.None,
1241                 NumWant:  -1,
1242                 Port:     uint16(t.cl.incomingPeerPort()),
1243                 PeerId:   t.cl.peerID,
1244                 InfoHash: t.infoHash,
1245                 Left:     t.bytesLeftAnnounce(),
1246         }
1247 }
1248
1249 // Adds peers revealed in an announce until the announce ends, or we have
1250 // enough peers.
1251 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1252         cl := t.cl
1253         // Count all the unique addresses we got during this announce.
1254         allAddrs := make(map[string]struct{})
1255         for {
1256                 select {
1257                 case v, ok := <-pvs:
1258                         if !ok {
1259                                 return
1260                         }
1261                         addPeers := make([]Peer, 0, len(v.Peers))
1262                         for _, cp := range v.Peers {
1263                                 if cp.Port == 0 {
1264                                         // Can't do anything with this.
1265                                         continue
1266                                 }
1267                                 addPeers = append(addPeers, Peer{
1268                                         IP:     cp.IP[:],
1269                                         Port:   cp.Port,
1270                                         Source: peerSourceDHTGetPeers,
1271                                 })
1272                                 key := (&net.UDPAddr{
1273                                         IP:   cp.IP[:],
1274                                         Port: cp.Port,
1275                                 }).String()
1276                                 allAddrs[key] = struct{}{}
1277                         }
1278                         cl.mu.Lock()
1279                         t.addPeers(addPeers)
1280                         numPeers := len(t.peers)
1281                         cl.mu.Unlock()
1282                         if numPeers >= cl.config.TorrentPeersHighWater {
1283                                 return
1284                         }
1285                 case <-t.closed.LockedChan(&cl.mu):
1286                         return
1287                 }
1288         }
1289 }
1290
1291 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1292         cl := t.cl
1293         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1294         if err != nil {
1295                 return
1296         }
1297         t.consumeDHTAnnounce(ps.Peers)
1298         ps.Close()
1299         return
1300 }
1301
1302 func (t *Torrent) dhtAnnouncer() {
1303         cl := t.cl
1304         for {
1305                 select {
1306                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1307                 case <-t.closed.LockedChan(&cl.mu):
1308                         return
1309                 }
1310                 err := t.announceDHT(true)
1311                 func() {
1312                         cl.mu.Lock()
1313                         defer cl.mu.Unlock()
1314                         if err == nil {
1315                                 t.numDHTAnnounces++
1316                         } else {
1317                                 log.Printf("error announcing %q to DHT: %s", t, err)
1318                         }
1319                 }()
1320                 select {
1321                 case <-t.closed.LockedChan(&cl.mu):
1322                         return
1323                 case <-time.After(5 * time.Minute):
1324                 }
1325         }
1326 }
1327
1328 func (t *Torrent) addPeers(peers []Peer) {
1329         for _, p := range peers {
1330                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1331                         continue
1332                 }
1333                 t.addPeer(p)
1334         }
1335 }
1336
1337 func (t *Torrent) Stats() TorrentStats {
1338         t.cl.mu.Lock()
1339         defer t.cl.mu.Unlock()
1340
1341         t.stats.ActivePeers = len(t.conns)
1342         t.stats.HalfOpenPeers = len(t.halfOpen)
1343         t.stats.PendingPeers = len(t.peers)
1344         t.stats.TotalPeers = t.numTotalPeers()
1345
1346         return t.stats
1347 }
1348
1349 // The total number of peers in the torrent.
1350 func (t *Torrent) numTotalPeers() int {
1351         peers := make(map[string]struct{})
1352         for conn := range t.conns {
1353                 ra := conn.conn.RemoteAddr()
1354                 if ra == nil {
1355                         // It's been closed and doesn't support RemoteAddr.
1356                         continue
1357                 }
1358                 peers[ra.String()] = struct{}{}
1359         }
1360         for addr := range t.halfOpen {
1361                 peers[addr] = struct{}{}
1362         }
1363         for _, peer := range t.peers {
1364                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1365         }
1366         return len(peers)
1367 }
1368
1369 // Returns true if the connection is added.
1370 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1371         if t.cl.closed.IsSet() {
1372                 return false
1373         }
1374         if !t.wantConns() {
1375                 return false
1376         }
1377         for c0 := range t.conns {
1378                 if c.PeerID == c0.PeerID {
1379                         // Already connected to a client with that ID.
1380                         duplicateClientConns.Add(1)
1381                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1382                         // Retain the connection from initiated from lower peer ID to
1383                         // higher.
1384                         if outgoing == lower {
1385                                 // Close the other one.
1386                                 c0.Close()
1387                                 // TODO: Is it safe to delete from the map while we're
1388                                 // iterating over it?
1389                                 t.deleteConnection(c0)
1390                         } else {
1391                                 // Abandon this one.
1392                                 return false
1393                         }
1394                 }
1395         }
1396         if len(t.conns) >= t.maxEstablishedConns {
1397                 c := t.worstBadConn()
1398                 if c == nil {
1399                         return false
1400                 }
1401                 if t.cl.config.Debug && missinggo.CryHeard() {
1402                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1403                 }
1404                 c.Close()
1405                 t.deleteConnection(c)
1406         }
1407         if len(t.conns) >= t.maxEstablishedConns {
1408                 panic(len(t.conns))
1409         }
1410         if c.t != nil {
1411                 panic("connection already associated with a torrent")
1412         }
1413         // Reconcile bytes transferred before connection was associated with a
1414         // torrent.
1415         t.stats.wroteBytes(c.stats.BytesWritten)
1416         t.stats.readBytes(c.stats.BytesRead)
1417         c.t = t
1418         t.conns[c] = struct{}{}
1419         return true
1420 }
1421
1422 func (t *Torrent) wantConns() bool {
1423         if !t.networkingEnabled {
1424                 return false
1425         }
1426         if t.closed.IsSet() {
1427                 return false
1428         }
1429         if !t.seeding() && !t.needData() {
1430                 return false
1431         }
1432         if len(t.conns) < t.maxEstablishedConns {
1433                 return true
1434         }
1435         return t.worstBadConn() != nil
1436 }
1437
1438 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1439         t.cl.mu.Lock()
1440         defer t.cl.mu.Unlock()
1441         oldMax = t.maxEstablishedConns
1442         t.maxEstablishedConns = max
1443         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1444         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1445                 t.dropConnection(wcs.Pop().(*connection))
1446         }
1447         t.openNewConns()
1448         return oldMax
1449 }
1450
1451 func (t *Torrent) mu() missinggo.RWLocker {
1452         return &t.cl.mu
1453 }
1454
1455 func (t *Torrent) pieceHashed(piece int, correct bool) {
1456         if t.closed.IsSet() {
1457                 return
1458         }
1459         p := &t.pieces[piece]
1460         touchers := t.reapPieceTouchers(piece)
1461         if p.everHashed {
1462                 // Don't score the first time a piece is hashed, it could be an
1463                 // initial check.
1464                 if correct {
1465                         pieceHashedCorrect.Add(1)
1466                 } else {
1467                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1468                         pieceHashedNotCorrect.Add(1)
1469                 }
1470         }
1471         p.everHashed = true
1472         if correct {
1473                 for _, c := range touchers {
1474                         c.goodPiecesDirtied++
1475                 }
1476                 err := p.Storage().MarkComplete()
1477                 if err != nil {
1478                         log.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
1479                 }
1480                 t.updatePieceCompletion(piece)
1481         } else {
1482                 if len(touchers) != 0 {
1483                         for _, c := range touchers {
1484                                 // Y u do dis peer?!
1485                                 c.badPiecesDirtied++
1486                         }
1487                         slices.Sort(touchers, connLessTrusted)
1488                         if t.cl.config.Debug {
1489                                 log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1490                                         for _, c := range touchers {
1491                                                 ret = append(ret, c.netGoodPiecesDirtied())
1492                                         }
1493                                         return
1494                                 }())
1495                         }
1496                         c := touchers[0]
1497                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1498                         c.Drop()
1499                 }
1500                 t.onIncompletePiece(piece)
1501         }
1502 }
1503
1504 func (t *Torrent) cancelRequestsForPiece(piece int) {
1505         // TODO: Make faster
1506         for cn := range t.conns {
1507                 cn.tickleWriter()
1508         }
1509 }
1510
1511 func (t *Torrent) onPieceCompleted(piece int) {
1512         t.pendingPieces.Remove(piece)
1513         t.pendAllChunkSpecs(piece)
1514         t.cancelRequestsForPiece(piece)
1515         for conn := range t.conns {
1516                 conn.Have(piece)
1517         }
1518 }
1519
1520 func (t *Torrent) onIncompletePiece(piece int) {
1521         if t.pieceAllDirty(piece) {
1522                 t.pendAllChunkSpecs(piece)
1523         }
1524         if !t.wantPieceIndex(piece) {
1525                 return
1526         }
1527         // We could drop any connections that we told we have a piece that we
1528         // don't here. But there's a test failure, and it seems clients don't care
1529         // if you request pieces that you already claim to have. Pruning bad
1530         // connections might just remove any connections that aren't treating us
1531         // favourably anyway.
1532
1533         // for c := range t.conns {
1534         //      if c.sentHave(piece) {
1535         //              c.Drop()
1536         //      }
1537         // }
1538         for conn := range t.conns {
1539                 if conn.PeerHasPiece(piece) {
1540                         conn.updateRequests()
1541                 }
1542         }
1543 }
1544
1545 func (t *Torrent) verifyPiece(piece int) {
1546         cl := t.cl
1547         cl.mu.Lock()
1548         defer cl.mu.Unlock()
1549         p := &t.pieces[piece]
1550         defer func() {
1551                 p.numVerifies++
1552                 cl.event.Broadcast()
1553         }()
1554         for p.hashing || t.storage == nil {
1555                 cl.event.Wait()
1556         }
1557         if !p.t.piecesQueuedForHash.Remove(piece) {
1558                 panic("piece was not queued")
1559         }
1560         if t.closed.IsSet() || t.pieceComplete(piece) {
1561                 t.updatePiecePriority(piece)
1562                 return
1563         }
1564         p.hashing = true
1565         t.publishPieceChange(piece)
1566         t.storageLock.RLock()
1567         cl.mu.Unlock()
1568         sum := t.hashPiece(piece)
1569         t.storageLock.RUnlock()
1570         cl.mu.Lock()
1571         p.hashing = false
1572         t.pieceHashed(piece, sum == p.hash)
1573         t.publishPieceChange(piece)
1574 }
1575
1576 // Return the connections that touched a piece, and clear the entry while
1577 // doing it.
1578 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1579         for c := range t.conns {
1580                 if _, ok := c.peerTouchedPieces[piece]; ok {
1581                         ret = append(ret, c)
1582                         delete(c.peerTouchedPieces, piece)
1583                 }
1584         }
1585         return
1586 }
1587
1588 func (t *Torrent) connsAsSlice() (ret []*connection) {
1589         for c := range t.conns {
1590                 ret = append(ret, c)
1591         }
1592         return
1593 }
1594
1595 // Currently doesn't really queue, but should in the future.
1596 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1597         piece := &t.pieces[pieceIndex]
1598         if piece.queuedForHash() {
1599                 return
1600         }
1601         t.piecesQueuedForHash.Add(pieceIndex)
1602         t.publishPieceChange(pieceIndex)
1603         go t.verifyPiece(pieceIndex)
1604 }
1605
1606 func (t *Torrent) VerifyData() {
1607         for i := range iter.N(t.NumPieces()) {
1608                 t.Piece(i).VerifyData()
1609         }
1610 }