]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Track completion known to implementation state
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/bradfitz/iter"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         networkingEnabled bool
48         requestStrategy   int
49
50         closed   missinggo.Event
51         infoHash metainfo.Hash
52         pieces   []Piece
53         // Values are the piece indices that changed.
54         pieceStateChanges *pubsub.PubSub
55         // The size of chunks to request from peers over the wire. This is
56         // normally 16KiB by convention these days.
57         chunkSize pp.Integer
58         chunkPool *sync.Pool
59         // Total length of the torrent in bytes. Stored because it's not O(1) to
60         // get this from the info dict.
61         length int64
62
63         // The storage to open when the info dict becomes available.
64         storageOpener *storage.Client
65         // Storage for torrent data.
66         storage *storage.Torrent
67
68         metainfo metainfo.MetaInfo
69
70         // The info dict. nil if we don't have it (yet).
71         info *metainfo.Info
72
73         // Active peer connections, running message stream loops.
74         conns               map[*connection]struct{}
75         maxEstablishedConns int
76         // Set of addrs to which we're attempting to connect. Connections are
77         // half-open until all handshakes are completed.
78         halfOpen    map[string]Peer
79         fastestConn *connection
80
81         // Reserve of peers to connect to. A peer can be both here and in the
82         // active connections if were told about the peer after connecting with
83         // them. That encourages us to reconnect to peers that are well known in
84         // the swarm.
85         peers          map[peersKey]Peer
86         wantPeersEvent missinggo.Event
87         // An announcer for each tracker URL.
88         trackerAnnouncers map[string]*trackerScraper
89         // How many times we've initiated a DHT announce. TODO: Move into stats.
90         numDHTAnnounces int
91
92         // Name used if the info name isn't available. Should be cleared when the
93         // Info does become available.
94         displayName string
95         // The bencoded bytes of the info dict. This is actively manipulated if
96         // the info bytes aren't initially available, and we try to fetch them
97         // from peers.
98         metadataBytes []byte
99         // Each element corresponds to the 16KiB metadata pieces. If true, we have
100         // received that piece.
101         metadataCompletedChunks []bool
102
103         // Set when .Info is obtained.
104         gotMetainfo missinggo.Event
105
106         readers               map[*Reader]struct{}
107         readerNowPieces       bitmap.Bitmap
108         readerReadaheadPieces bitmap.Bitmap
109
110         // The indexes of pieces we want with normal priority, that aren't
111         // currently available.
112         pendingPieces bitmap.Bitmap
113         // A cache of completed piece indices.
114         completedPieces bitmap.Bitmap
115         // Pieces that need to be hashed.
116         piecesQueuedForHash bitmap.Bitmap
117
118         // A pool of piece priorities []int for assignment to new connections.
119         // These "inclinations" are used to give connections preference for
120         // different pieces.
121         connPieceInclinationPool sync.Pool
122         // Torrent-level statistics.
123         stats TorrentStats
124 }
125
126 // Returns a channel that is closed when the Torrent is closed.
127 func (t *Torrent) Closed() <-chan struct{} {
128         return t.closed.LockedChan(&t.cl.mu)
129 }
130
131 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
132 // pending, and half-open peers.
133 func (t *Torrent) KnownSwarm() (ks []Peer) {
134         // Add pending peers to the list
135         for _, peer := range t.peers {
136                 ks = append(ks, peer)
137         }
138
139         // Add half-open peers to the list
140         for _, peer := range t.halfOpen {
141                 ks = append(ks, peer)
142         }
143
144         // Add active peers to the list
145         for conn := range t.conns {
146                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
147                 if err != nil {
148                         panic(err)
149                 }
150
151                 ip := net.ParseIP(host)
152                 port, err := strconv.Atoi(portString)
153                 if err != nil {
154                         panic(err)
155                 }
156
157                 ks = append(ks, Peer{
158                         Id:     conn.PeerID,
159                         IP:     ip,
160                         Port:   port,
161                         Source: conn.Discovery,
162                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
163                         // > But if we're not connected to them with an encrypted connection, I couldn't say
164                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
165                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
166                         // > connection. It's probably easiest to do the latter for now.
167                         // https://github.com/anacrolix/torrent/pull/188
168                         SupportsEncryption: conn.headerEncrypted,
169                 })
170         }
171
172         return
173 }
174
175 func (t *Torrent) setChunkSize(size pp.Integer) {
176         t.chunkSize = size
177         t.chunkPool = &sync.Pool{
178                 New: func() interface{} {
179                         return make([]byte, size)
180                 },
181         }
182 }
183
184 func (t *Torrent) setDisplayName(dn string) {
185         if t.haveInfo() {
186                 return
187         }
188         t.displayName = dn
189 }
190
191 func (t *Torrent) pieceComplete(piece int) bool {
192         return t.completedPieces.Get(piece)
193 }
194
195 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
196         return t.pieces[piece].Storage().Completion()
197 }
198
199 // There's a connection to that address already.
200 func (t *Torrent) addrActive(addr string) bool {
201         if _, ok := t.halfOpen[addr]; ok {
202                 return true
203         }
204         for c := range t.conns {
205                 if c.remoteAddr().String() == addr {
206                         return true
207                 }
208         }
209         return false
210 }
211
212 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
213         ret = make([]*connection, 0, len(t.conns))
214         for c := range t.conns {
215                 if !c.closed.IsSet() {
216                         ret = append(ret, c)
217                 }
218         }
219         return
220 }
221
222 func (t *Torrent) addPeer(p Peer) {
223         cl := t.cl
224         cl.openNewConns(t)
225         if len(t.peers) >= torrentPeersHighWater {
226                 return
227         }
228         key := peersKey{string(p.IP), p.Port}
229         if _, ok := t.peers[key]; ok {
230                 return
231         }
232         t.peers[key] = p
233         peersAddedBySource.Add(string(p.Source), 1)
234         cl.openNewConns(t)
235
236 }
237
238 func (t *Torrent) invalidateMetadata() {
239         for i := range t.metadataCompletedChunks {
240                 t.metadataCompletedChunks[i] = false
241         }
242         t.info = nil
243 }
244
245 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
246         if t.haveInfo() {
247                 return
248         }
249         if index >= len(t.metadataCompletedChunks) {
250                 log.Printf("%s: ignoring metadata piece %d", t, index)
251                 return
252         }
253         copy(t.metadataBytes[(1<<14)*index:], data)
254         t.metadataCompletedChunks[index] = true
255 }
256
257 func (t *Torrent) metadataPieceCount() int {
258         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
259 }
260
261 func (t *Torrent) haveMetadataPiece(piece int) bool {
262         if t.haveInfo() {
263                 return (1<<14)*piece < len(t.metadataBytes)
264         } else {
265                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
266         }
267 }
268
269 func (t *Torrent) metadataSizeKnown() bool {
270         return t.metadataBytes != nil
271 }
272
273 func (t *Torrent) metadataSize() int {
274         return len(t.metadataBytes)
275 }
276
277 func infoPieceHashes(info *metainfo.Info) (ret []string) {
278         for i := 0; i < len(info.Pieces); i += sha1.Size {
279                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
280         }
281         return
282 }
283
284 func (t *Torrent) makePieces() {
285         hashes := infoPieceHashes(t.info)
286         t.pieces = make([]Piece, len(hashes))
287         for i, hash := range hashes {
288                 piece := &t.pieces[i]
289                 piece.t = t
290                 piece.index = i
291                 piece.noPendingWrites.L = &piece.pendingWritesMutex
292                 missinggo.CopyExact(piece.hash[:], hash)
293         }
294 }
295
296 // Called when metadata for a torrent becomes available.
297 func (t *Torrent) setInfoBytes(b []byte) error {
298         if t.haveInfo() {
299                 return nil
300         }
301         if metainfo.HashBytes(b) != t.infoHash {
302                 return errors.New("info bytes have wrong hash")
303         }
304         var info metainfo.Info
305         err := bencode.Unmarshal(b, &info)
306         if err != nil {
307                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
308         }
309         err = validateInfo(&info)
310         if err != nil {
311                 return fmt.Errorf("bad info: %s", err)
312         }
313         defer t.updateWantPeersEvent()
314         t.info = &info
315         t.displayName = "" // Save a few bytes lol.
316         t.cl.event.Broadcast()
317         t.gotMetainfo.Set()
318         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
319         if err != nil {
320                 return fmt.Errorf("error opening torrent storage: %s", err)
321         }
322         t.length = 0
323         for _, f := range t.info.UpvertedFiles() {
324                 t.length += f.Length
325         }
326         t.metadataBytes = b
327         t.metadataCompletedChunks = nil
328         t.makePieces()
329         for conn := range t.conns {
330                 if err := conn.setNumPieces(t.numPieces()); err != nil {
331                         log.Printf("closing connection: %s", err)
332                         conn.Close()
333                 }
334         }
335         for i := range t.pieces {
336                 t.updatePieceCompletion(i)
337                 p := &t.pieces[i]
338                 if !p.storageCompletionOk {
339                         log.Printf("piece %s completion unknown, queueing check", p)
340                         t.queuePieceCheck(i)
341                 }
342         }
343         return nil
344 }
345
346 func (t *Torrent) haveAllMetadataPieces() bool {
347         if t.haveInfo() {
348                 return true
349         }
350         if t.metadataCompletedChunks == nil {
351                 return false
352         }
353         for _, have := range t.metadataCompletedChunks {
354                 if !have {
355                         return false
356                 }
357         }
358         return true
359 }
360
361 // TODO: Propagate errors to disconnect peer.
362 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
363         if t.haveInfo() {
364                 // We already know the correct metadata size.
365                 return
366         }
367         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
368                 return errors.New("bad size")
369         }
370         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
371                 return
372         }
373         t.metadataBytes = make([]byte, bytes)
374         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
375         for c := range t.conns {
376                 c.requestPendingMetadata()
377         }
378         return
379 }
380
381 // The current working name for the torrent. Either the name in the info dict,
382 // or a display name given such as by the dn value in a magnet link, or "".
383 func (t *Torrent) name() string {
384         if t.haveInfo() {
385                 return t.info.Name
386         }
387         return t.displayName
388 }
389
390 func (t *Torrent) pieceState(index int) (ret PieceState) {
391         p := &t.pieces[index]
392         ret.Priority = t.piecePriority(index)
393         if t.pieceComplete(index) {
394                 ret.Complete = true
395         }
396         if p.queuedForHash() || p.hashing {
397                 ret.Checking = true
398         }
399         if !ret.Complete && t.piecePartiallyDownloaded(index) {
400                 ret.Partial = true
401         }
402         return
403 }
404
405 func (t *Torrent) metadataPieceSize(piece int) int {
406         return metadataPieceSize(len(t.metadataBytes), piece)
407 }
408
409 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
410         d := map[string]int{
411                 "msg_type": msgType,
412                 "piece":    piece,
413         }
414         if data != nil {
415                 d["total_size"] = len(t.metadataBytes)
416         }
417         p, err := bencode.Marshal(d)
418         if err != nil {
419                 panic(err)
420         }
421         return pp.Message{
422                 Type:            pp.Extended,
423                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
424                 ExtendedPayload: append(p, data...),
425         }
426 }
427
428 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
429         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
430                 ret = append(ret, PieceStateRun{
431                         PieceState: el.(PieceState),
432                         Length:     int(count),
433                 })
434         })
435         for index := range t.pieces {
436                 rle.Append(t.pieceState(index), 1)
437         }
438         rle.Flush()
439         return
440 }
441
442 // Produces a small string representing a PieceStateRun.
443 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
444         ret = fmt.Sprintf("%d", psr.Length)
445         ret += func() string {
446                 switch psr.Priority {
447                 case PiecePriorityNext:
448                         return "N"
449                 case PiecePriorityNormal:
450                         return "."
451                 case PiecePriorityReadahead:
452                         return "R"
453                 case PiecePriorityNow:
454                         return "!"
455                 default:
456                         return ""
457                 }
458         }()
459         if psr.Checking {
460                 ret += "H"
461         }
462         if psr.Partial {
463                 ret += "P"
464         }
465         if psr.Complete {
466                 ret += "C"
467         }
468         return
469 }
470
471 func (t *Torrent) writeStatus(w io.Writer) {
472         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
473         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
474         if !t.haveInfo() {
475                 fmt.Fprintf(w, "Metadata have: ")
476                 for _, h := range t.metadataCompletedChunks {
477                         fmt.Fprintf(w, "%c", func() rune {
478                                 if h {
479                                         return 'H'
480                                 } else {
481                                         return '.'
482                                 }
483                         }())
484                 }
485                 fmt.Fprintln(w)
486         }
487         fmt.Fprintf(w, "Piece length: %s\n", func() string {
488                 if t.haveInfo() {
489                         return fmt.Sprint(t.usualPieceSize())
490                 } else {
491                         return "?"
492                 }
493         }())
494         if t.info != nil {
495                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
496                 fmt.Fprint(w, "Piece States:")
497                 for _, psr := range t.pieceStateRuns() {
498                         w.Write([]byte(" "))
499                         w.Write([]byte(pieceStateRunStatusChars(psr)))
500                 }
501                 fmt.Fprintln(w)
502         }
503         fmt.Fprintf(w, "Reader Pieces:")
504         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
505                 fmt.Fprintf(w, " %d:%d", begin, end)
506                 return true
507         })
508         fmt.Fprintln(w)
509
510         fmt.Fprintf(w, "Trackers:\n")
511         func() {
512                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
513                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
514                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
515                         return l.url < r.url
516                 }).([]*trackerScraper) {
517                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
518                 }
519                 tw.Flush()
520         }()
521
522         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
523
524         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
525         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
526         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
527         conns := t.connsAsSlice()
528         slices.Sort(conns, worseConn)
529         for i, c := range conns {
530                 fmt.Fprintf(w, "%2d. ", i+1)
531                 c.WriteStatus(w, t)
532         }
533 }
534
535 func (t *Torrent) haveInfo() bool {
536         return t.info != nil
537 }
538
539 // Returns a run-time generated MetaInfo that includes the info bytes and
540 // announce-list as currently known to the client.
541 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
542         return metainfo.MetaInfo{
543                 CreationDate: time.Now().Unix(),
544                 Comment:      "dynamic metainfo from client",
545                 CreatedBy:    "go.torrent",
546                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
547                 InfoBytes:    t.metadataBytes,
548         }
549 }
550
551 func (t *Torrent) BytesMissing() int64 {
552         t.mu().RLock()
553         defer t.mu().RUnlock()
554         return t.bytesMissingLocked()
555 }
556
557 func (t *Torrent) bytesMissingLocked() int64 {
558         return t.bytesLeft()
559 }
560
561 func (t *Torrent) bytesLeft() (left int64) {
562         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
563                 p := t.pieces[piece]
564                 left += int64(p.length() - p.numDirtyBytes())
565                 return true
566         })
567         return
568 }
569
570 // Bytes left to give in tracker announces.
571 func (t *Torrent) bytesLeftAnnounce() uint64 {
572         if t.haveInfo() {
573                 return uint64(t.bytesLeft())
574         } else {
575                 return math.MaxUint64
576         }
577 }
578
579 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
580         if t.pieceComplete(piece) {
581                 return false
582         }
583         if t.pieceAllDirty(piece) {
584                 return false
585         }
586         return t.pieces[piece].hasDirtyChunks()
587 }
588
589 func (t *Torrent) usualPieceSize() int {
590         return int(t.info.PieceLength)
591 }
592
593 func (t *Torrent) numPieces() int {
594         return t.info.NumPieces()
595 }
596
597 func (t *Torrent) numPiecesCompleted() (num int) {
598         return t.completedPieces.Len()
599 }
600
601 func (t *Torrent) close() (err error) {
602         t.closed.Set()
603         if t.storage != nil {
604                 t.storage.Close()
605         }
606         for conn := range t.conns {
607                 conn.Close()
608         }
609         t.cl.event.Broadcast()
610         t.pieceStateChanges.Close()
611         t.updateWantPeersEvent()
612         return
613 }
614
615 func (t *Torrent) requestOffset(r request) int64 {
616         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
617 }
618
619 // Return the request that would include the given offset into the torrent
620 // data. Returns !ok if there is no such request.
621 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
622         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
623 }
624
625 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
626         tr := perf.NewTimer()
627
628         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
629         if err == nil && n != len(data) {
630                 err = io.ErrShortWrite
631         }
632         if err == nil {
633                 tr.Mark("write chunk")
634         }
635         return
636 }
637
638 func (t *Torrent) bitfield() (bf []bool) {
639         bf = make([]bool, t.numPieces())
640         t.completedPieces.IterTyped(func(piece int) (again bool) {
641                 bf[piece] = true
642                 return true
643         })
644         return
645 }
646
647 func (t *Torrent) pieceNumChunks(piece int) int {
648         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
649 }
650
651 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
652         t.pieces[pieceIndex].dirtyChunks.Clear()
653 }
654
655 type Peer struct {
656         Id     [20]byte
657         IP     net.IP
658         Port   int
659         Source peerSource
660         // Peer is known to support encryption.
661         SupportsEncryption bool
662 }
663
664 func (t *Torrent) pieceLength(piece int) pp.Integer {
665         if piece == t.numPieces()-1 {
666                 ret := pp.Integer(t.length % t.info.PieceLength)
667                 if ret != 0 {
668                         return ret
669                 }
670         }
671         return pp.Integer(t.info.PieceLength)
672 }
673
674 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
675         hash := pieceHash.New()
676         p := &t.pieces[piece]
677         p.waitNoPendingWrites()
678         ip := t.info.Piece(piece)
679         pl := ip.Length()
680         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
681         if n == pl {
682                 missinggo.CopyExact(&ret, hash.Sum(nil))
683                 return
684         }
685         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
686                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
687         }
688         return
689 }
690
691 func (t *Torrent) haveAnyPieces() bool {
692         for i := range t.pieces {
693                 if t.pieceComplete(i) {
694                         return true
695                 }
696         }
697         return false
698 }
699
700 func (t *Torrent) havePiece(index int) bool {
701         return t.haveInfo() && t.pieceComplete(index)
702 }
703
704 func (t *Torrent) haveChunk(r request) (ret bool) {
705         // defer func() {
706         //      log.Println("have chunk", r, ret)
707         // }()
708         if !t.haveInfo() {
709                 return false
710         }
711         if t.pieceComplete(int(r.Index)) {
712                 return true
713         }
714         p := &t.pieces[r.Index]
715         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
716 }
717
718 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
719         return int(cs.Begin / chunkSize)
720 }
721
722 func (t *Torrent) wantPiece(r request) bool {
723         if !t.wantPieceIndex(int(r.Index)) {
724                 return false
725         }
726         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
727                 return true
728         }
729         // TODO: What about pieces that were wanted, but aren't now, and aren't
730         // completed either? That used to be done here.
731         return false
732 }
733
734 func (t *Torrent) wantPieceIndex(index int) bool {
735         if !t.haveInfo() {
736                 return false
737         }
738         if index < 0 || index >= t.numPieces() {
739                 return false
740         }
741         p := &t.pieces[index]
742         if p.queuedForHash() {
743                 return false
744         }
745         if p.hashing {
746                 return false
747         }
748         if t.pieceComplete(index) {
749                 return false
750         }
751         if t.pendingPieces.Contains(index) {
752                 return true
753         }
754         return !t.forReaderOffsetPieces(func(begin, end int) bool {
755                 return index < begin || index >= end
756         })
757 }
758
759 // The worst connection is one that hasn't been sent, or sent anything useful
760 // for the longest. A bad connection is one that usually sends us unwanted
761 // pieces, or has been in worser half of the established connections for more
762 // than a minute.
763 func (t *Torrent) worstBadConn() *connection {
764         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
765         heap.Init(&wcs)
766         for wcs.Len() != 0 {
767                 c := heap.Pop(&wcs).(*connection)
768                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
769                         return c
770                 }
771                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
772                         // Give connections 1 minute to prove themselves.
773                         if time.Since(c.completedHandshake) > time.Minute {
774                                 return c
775                         }
776                 }
777         }
778         return nil
779 }
780
781 type PieceStateChange struct {
782         Index int
783         PieceState
784 }
785
786 func (t *Torrent) publishPieceChange(piece int) {
787         cur := t.pieceState(piece)
788         p := &t.pieces[piece]
789         if cur != p.publicPieceState {
790                 p.publicPieceState = cur
791                 t.pieceStateChanges.Publish(PieceStateChange{
792                         piece,
793                         cur,
794                 })
795         }
796 }
797
798 func (t *Torrent) pieceNumPendingChunks(piece int) int {
799         if t.pieceComplete(piece) {
800                 return 0
801         }
802         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
803 }
804
805 func (t *Torrent) pieceAllDirty(piece int) bool {
806         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
807 }
808
809 func (t *Torrent) readersChanged() {
810         t.updateReaderPieces()
811         t.updateAllPiecePriorities()
812 }
813
814 func (t *Torrent) updateReaderPieces() {
815         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
816 }
817
818 func (t *Torrent) readerPosChanged(from, to pieceRange) {
819         if from == to {
820                 return
821         }
822         t.updateReaderPieces()
823         // Order the ranges, high and low.
824         l, h := from, to
825         if l.begin > h.begin {
826                 l, h = h, l
827         }
828         if l.end < h.begin {
829                 // Two distinct ranges.
830                 t.updatePiecePriorities(l.begin, l.end)
831                 t.updatePiecePriorities(h.begin, h.end)
832         } else {
833                 // Ranges overlap.
834                 end := l.end
835                 if h.end > end {
836                         end = h.end
837                 }
838                 t.updatePiecePriorities(l.begin, end)
839         }
840 }
841
842 func (t *Torrent) maybeNewConns() {
843         // Tickle the accept routine.
844         t.cl.event.Broadcast()
845         t.openNewConns()
846 }
847
848 func (t *Torrent) piecePriorityChanged(piece int) {
849         for c := range t.conns {
850                 if c.updatePiecePriority(piece) {
851                         c.updateRequests()
852                 }
853         }
854         t.maybeNewConns()
855         t.publishPieceChange(piece)
856 }
857
858 func (t *Torrent) updatePiecePriority(piece int) {
859         p := &t.pieces[piece]
860         newPrio := t.piecePriorityUncached(piece)
861         if newPrio == p.priority {
862                 return
863         }
864         p.priority = newPrio
865         t.piecePriorityChanged(piece)
866 }
867
868 func (t *Torrent) updateAllPiecePriorities() {
869         t.updatePiecePriorities(0, len(t.pieces))
870 }
871
872 // Update all piece priorities in one hit. This function should have the same
873 // output as updatePiecePriority, but across all pieces.
874 func (t *Torrent) updatePiecePriorities(begin, end int) {
875         for i := begin; i < end; i++ {
876                 t.updatePiecePriority(i)
877         }
878 }
879
880 // Returns the range of pieces [begin, end) that contains the extent of bytes.
881 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
882         if off >= t.length {
883                 return
884         }
885         if off < 0 {
886                 size += off
887                 off = 0
888         }
889         if size <= 0 {
890                 return
891         }
892         begin = int(off / t.info.PieceLength)
893         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
894         if end > t.info.NumPieces() {
895                 end = t.info.NumPieces()
896         }
897         return
898 }
899
900 // Returns true if all iterations complete without breaking. Returns the read
901 // regions for all readers. The reader regions should not be merged as some
902 // callers depend on this method to enumerate readers.
903 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
904         for r := range t.readers {
905                 p := r.pieces
906                 if p.begin >= p.end {
907                         continue
908                 }
909                 if !f(p.begin, p.end) {
910                         return false
911                 }
912         }
913         return true
914 }
915
916 func (t *Torrent) piecePriority(piece int) piecePriority {
917         if !t.haveInfo() {
918                 return PiecePriorityNone
919         }
920         return t.pieces[piece].priority
921 }
922
923 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
924         if t.pieceComplete(piece) {
925                 return PiecePriorityNone
926         }
927         if t.readerNowPieces.Contains(piece) {
928                 return PiecePriorityNow
929         }
930         // if t.readerNowPieces.Contains(piece - 1) {
931         //      return PiecePriorityNext
932         // }
933         if t.readerReadaheadPieces.Contains(piece) {
934                 return PiecePriorityReadahead
935         }
936         if t.pendingPieces.Contains(piece) {
937                 return PiecePriorityNormal
938         }
939         return PiecePriorityNone
940 }
941
942 func (t *Torrent) pendPiece(piece int) {
943         if t.pendingPieces.Contains(piece) {
944                 return
945         }
946         if t.havePiece(piece) {
947                 return
948         }
949         t.pendingPieces.Add(piece)
950         t.updatePiecePriority(piece)
951 }
952
953 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
954         t.pendingPieces.Sub(unpend)
955         unpend.IterTyped(func(piece int) (again bool) {
956                 t.updatePiecePriority(piece)
957                 return true
958         })
959 }
960
961 func (t *Torrent) pendPieceRange(begin, end int) {
962         for i := begin; i < end; i++ {
963                 t.pendPiece(i)
964         }
965 }
966
967 func (t *Torrent) unpendPieceRange(begin, end int) {
968         var bm bitmap.Bitmap
969         bm.AddRange(begin, end)
970         t.unpendPieces(bm)
971 }
972
973 func (t *Torrent) pendRequest(req request) {
974         ci := chunkIndex(req.chunkSpec, t.chunkSize)
975         t.pieces[req.Index].pendChunkIndex(ci)
976 }
977
978 func (t *Torrent) pieceCompletionChanged(piece int) {
979         t.cl.event.Broadcast()
980         if t.pieceComplete(piece) {
981                 t.onPieceCompleted(piece)
982         } else {
983                 t.onIncompletePiece(piece)
984         }
985         t.updatePiecePriority(piece)
986 }
987
988 func (t *Torrent) openNewConns() {
989         t.cl.openNewConns(t)
990 }
991
992 func (t *Torrent) getConnPieceInclination() []int {
993         _ret := t.connPieceInclinationPool.Get()
994         if _ret == nil {
995                 pieceInclinationsNew.Add(1)
996                 return rand.Perm(t.numPieces())
997         }
998         pieceInclinationsReused.Add(1)
999         return _ret.([]int)
1000 }
1001
1002 func (t *Torrent) putPieceInclination(pi []int) {
1003         t.connPieceInclinationPool.Put(pi)
1004         pieceInclinationsPut.Add(1)
1005 }
1006
1007 func (t *Torrent) updatePieceCompletion(piece int) {
1008         pcu := t.pieceCompleteUncached(piece)
1009         p := &t.pieces[piece]
1010         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1011         p.storageCompletionOk = pcu.Ok
1012         t.completedPieces.Set(piece, pcu.Complete)
1013         if changed {
1014                 t.pieceCompletionChanged(piece)
1015         }
1016 }
1017
1018 // Non-blocking read. Client lock is not required.
1019 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1020         p := &t.pieces[off/t.info.PieceLength]
1021         p.waitNoPendingWrites()
1022         return p.Storage().ReadAt(b, off-p.Info().Offset())
1023 }
1024
1025 func (t *Torrent) updateAllPieceCompletions() {
1026         for i := range iter.N(t.numPieces()) {
1027                 t.updatePieceCompletion(i)
1028         }
1029 }
1030
1031 // Returns an error if the metadata was completed, but couldn't be set for
1032 // some reason. Blame it on the last peer to contribute.
1033 func (t *Torrent) maybeCompleteMetadata() error {
1034         if t.haveInfo() {
1035                 // Nothing to do.
1036                 return nil
1037         }
1038         if !t.haveAllMetadataPieces() {
1039                 // Don't have enough metadata pieces.
1040                 return nil
1041         }
1042         err := t.setInfoBytes(t.metadataBytes)
1043         if err != nil {
1044                 t.invalidateMetadata()
1045                 return fmt.Errorf("error setting info bytes: %s", err)
1046         }
1047         if t.cl.config.Debug {
1048                 log.Printf("%s: got metadata from peers", t)
1049         }
1050         return nil
1051 }
1052
1053 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1054         t.forReaderOffsetPieces(func(begin, end int) bool {
1055                 ret.AddRange(begin, end)
1056                 return true
1057         })
1058         return
1059 }
1060
1061 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1062         t.forReaderOffsetPieces(func(begin, end int) bool {
1063                 if end > begin {
1064                         now.Add(begin)
1065                         readahead.AddRange(begin+1, end)
1066                 }
1067                 return true
1068         })
1069         return
1070 }
1071
1072 func (t *Torrent) needData() bool {
1073         if t.closed.IsSet() {
1074                 return false
1075         }
1076         if !t.haveInfo() {
1077                 return true
1078         }
1079         if t.pendingPieces.Len() != 0 {
1080                 return true
1081         }
1082         // Read as "not all complete".
1083         return !t.readerPieces().IterTyped(func(piece int) bool {
1084                 return t.pieceComplete(piece)
1085         })
1086 }
1087
1088 func appendMissingStrings(old, new []string) (ret []string) {
1089         ret = old
1090 new:
1091         for _, n := range new {
1092                 for _, o := range old {
1093                         if o == n {
1094                                 continue new
1095                         }
1096                 }
1097                 ret = append(ret, n)
1098         }
1099         return
1100 }
1101
1102 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1103         ret = existing
1104         for minNumTiers > len(ret) {
1105                 ret = append(ret, nil)
1106         }
1107         return
1108 }
1109
1110 func (t *Torrent) addTrackers(announceList [][]string) {
1111         fullAnnounceList := &t.metainfo.AnnounceList
1112         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1113         for tierIndex, trackerURLs := range announceList {
1114                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1115         }
1116         t.startMissingTrackerScrapers()
1117         t.updateWantPeersEvent()
1118 }
1119
1120 // Don't call this before the info is available.
1121 func (t *Torrent) bytesCompleted() int64 {
1122         if !t.haveInfo() {
1123                 return 0
1124         }
1125         return t.info.TotalLength() - t.bytesLeft()
1126 }
1127
1128 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1129         t.cl.mu.Lock()
1130         defer t.cl.mu.Unlock()
1131         return t.setInfoBytes(b)
1132 }
1133
1134 // Returns true if connection is removed from torrent.Conns.
1135 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1136         _, ret = t.conns[c]
1137         delete(t.conns, c)
1138         return
1139 }
1140
1141 func (t *Torrent) dropConnection(c *connection) {
1142         t.cl.event.Broadcast()
1143         c.Close()
1144         if t.deleteConnection(c) {
1145                 t.openNewConns()
1146         }
1147 }
1148
1149 func (t *Torrent) wantPeers() bool {
1150         if t.closed.IsSet() {
1151                 return false
1152         }
1153         if len(t.peers) > torrentPeersLowWater {
1154                 return false
1155         }
1156         return t.needData() || t.seeding()
1157 }
1158
1159 func (t *Torrent) updateWantPeersEvent() {
1160         if t.wantPeers() {
1161                 t.wantPeersEvent.Set()
1162         } else {
1163                 t.wantPeersEvent.Clear()
1164         }
1165 }
1166
1167 // Returns whether the client should make effort to seed the torrent.
1168 func (t *Torrent) seeding() bool {
1169         cl := t.cl
1170         if t.closed.IsSet() {
1171                 return false
1172         }
1173         if cl.config.NoUpload {
1174                 return false
1175         }
1176         if !cl.config.Seed {
1177                 return false
1178         }
1179         if t.needData() {
1180                 return false
1181         }
1182         return true
1183 }
1184
1185 func (t *Torrent) startScrapingTracker(url string) {
1186         if url == "" {
1187                 return
1188         }
1189         if _, ok := t.trackerAnnouncers[url]; ok {
1190                 return
1191         }
1192         newAnnouncer := &trackerScraper{
1193                 url: url,
1194                 t:   t,
1195         }
1196         if t.trackerAnnouncers == nil {
1197                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1198         }
1199         t.trackerAnnouncers[url] = newAnnouncer
1200         go newAnnouncer.Run()
1201 }
1202
1203 // Adds and starts tracker scrapers for tracker URLs that aren't already
1204 // running.
1205 func (t *Torrent) startMissingTrackerScrapers() {
1206         if t.cl.config.DisableTrackers {
1207                 return
1208         }
1209         t.startScrapingTracker(t.metainfo.Announce)
1210         for _, tier := range t.metainfo.AnnounceList {
1211                 for _, url := range tier {
1212                         t.startScrapingTracker(url)
1213                 }
1214         }
1215 }
1216
1217 // Returns an AnnounceRequest with fields filled out to defaults and current
1218 // values.
1219 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1220         return tracker.AnnounceRequest{
1221                 Event:    tracker.None,
1222                 NumWant:  -1,
1223                 Port:     uint16(t.cl.incomingPeerPort()),
1224                 PeerId:   t.cl.peerID,
1225                 InfoHash: t.infoHash,
1226                 Left:     t.bytesLeftAnnounce(),
1227         }
1228 }
1229
1230 // Adds peers revealed in an announce until the announce ends, or we have
1231 // enough peers.
1232 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1233         cl := t.cl
1234         // Count all the unique addresses we got during this announce.
1235         allAddrs := make(map[string]struct{})
1236         for {
1237                 select {
1238                 case v, ok := <-pvs:
1239                         if !ok {
1240                                 return
1241                         }
1242                         addPeers := make([]Peer, 0, len(v.Peers))
1243                         for _, cp := range v.Peers {
1244                                 if cp.Port == 0 {
1245                                         // Can't do anything with this.
1246                                         continue
1247                                 }
1248                                 addPeers = append(addPeers, Peer{
1249                                         IP:     cp.IP[:],
1250                                         Port:   cp.Port,
1251                                         Source: peerSourceDHTGetPeers,
1252                                 })
1253                                 key := (&net.UDPAddr{
1254                                         IP:   cp.IP[:],
1255                                         Port: cp.Port,
1256                                 }).String()
1257                                 allAddrs[key] = struct{}{}
1258                         }
1259                         cl.mu.Lock()
1260                         t.addPeers(addPeers)
1261                         numPeers := len(t.peers)
1262                         cl.mu.Unlock()
1263                         if numPeers >= torrentPeersHighWater {
1264                                 return
1265                         }
1266                 case <-t.closed.LockedChan(&cl.mu):
1267                         return
1268                 }
1269         }
1270 }
1271
1272 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1273         cl := t.cl
1274         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1275         if err != nil {
1276                 return
1277         }
1278         t.consumeDHTAnnounce(ps.Peers)
1279         ps.Close()
1280         return
1281 }
1282
1283 func (t *Torrent) dhtAnnouncer() {
1284         cl := t.cl
1285         for {
1286                 select {
1287                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1288                 case <-t.closed.LockedChan(&cl.mu):
1289                         return
1290                 }
1291                 err := t.announceDHT(true)
1292                 func() {
1293                         cl.mu.Lock()
1294                         defer cl.mu.Unlock()
1295                         if err == nil {
1296                                 t.numDHTAnnounces++
1297                         } else {
1298                                 log.Printf("error announcing %q to DHT: %s", t, err)
1299                         }
1300                 }()
1301                 select {
1302                 case <-t.closed.LockedChan(&cl.mu):
1303                         return
1304                 case <-time.After(5 * time.Minute):
1305                 }
1306         }
1307 }
1308
1309 func (t *Torrent) addPeers(peers []Peer) {
1310         for _, p := range peers {
1311                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1312                         continue
1313                 }
1314                 t.addPeer(p)
1315         }
1316 }
1317
1318 func (t *Torrent) Stats() TorrentStats {
1319         t.cl.mu.Lock()
1320         defer t.cl.mu.Unlock()
1321
1322         t.stats.ActivePeers = len(t.conns)
1323         t.stats.HalfOpenPeers = len(t.halfOpen)
1324         t.stats.PendingPeers = len(t.peers)
1325         t.stats.TotalPeers = t.numTotalPeers()
1326
1327         return t.stats
1328 }
1329
1330 // The total number of peers in the torrent.
1331 func (t *Torrent) numTotalPeers() int {
1332         peers := make(map[string]struct{})
1333         for conn := range t.conns {
1334                 peers[conn.conn.RemoteAddr().String()] = struct{}{}
1335         }
1336         for addr := range t.halfOpen {
1337                 peers[addr] = struct{}{}
1338         }
1339         for _, peer := range t.peers {
1340                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1341         }
1342         return len(peers)
1343 }
1344
1345 // Returns true if the connection is added.
1346 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1347         if t.cl.closed.IsSet() {
1348                 return false
1349         }
1350         if !t.wantConns() {
1351                 return false
1352         }
1353         for c0 := range t.conns {
1354                 if c.PeerID == c0.PeerID {
1355                         // Already connected to a client with that ID.
1356                         duplicateClientConns.Add(1)
1357                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1358                         // Retain the connection from initiated from lower peer ID to
1359                         // higher.
1360                         if outgoing == lower {
1361                                 // Close the other one.
1362                                 c0.Close()
1363                                 // Is it safe to delete from the map while we're iterating
1364                                 // over it?
1365                                 t.deleteConnection(c0)
1366                         } else {
1367                                 // Abandon this one.
1368                                 return false
1369                         }
1370                 }
1371         }
1372         if len(t.conns) >= t.maxEstablishedConns {
1373                 c := t.worstBadConn()
1374                 if c == nil {
1375                         return false
1376                 }
1377                 if t.cl.config.Debug && missinggo.CryHeard() {
1378                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1379                 }
1380                 c.Close()
1381                 t.deleteConnection(c)
1382         }
1383         if len(t.conns) >= t.maxEstablishedConns {
1384                 panic(len(t.conns))
1385         }
1386         if c.t != nil {
1387                 panic("connection already associated with a torrent")
1388         }
1389         // Reconcile bytes transferred before connection was associated with a
1390         // torrent.
1391         t.stats.wroteBytes(c.stats.BytesWritten)
1392         t.stats.readBytes(c.stats.BytesRead)
1393         c.t = t
1394         t.conns[c] = struct{}{}
1395         return true
1396 }
1397
1398 func (t *Torrent) wantConns() bool {
1399         if !t.networkingEnabled {
1400                 return false
1401         }
1402         if t.closed.IsSet() {
1403                 return false
1404         }
1405         if !t.seeding() && !t.needData() {
1406                 return false
1407         }
1408         if len(t.conns) < t.maxEstablishedConns {
1409                 return true
1410         }
1411         return t.worstBadConn() != nil
1412 }
1413
1414 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1415         t.cl.mu.Lock()
1416         defer t.cl.mu.Unlock()
1417         oldMax = t.maxEstablishedConns
1418         t.maxEstablishedConns = max
1419         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1420         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1421                 t.dropConnection(wcs.Pop().(*connection))
1422         }
1423         t.openNewConns()
1424         return oldMax
1425 }
1426
1427 func (t *Torrent) mu() missinggo.RWLocker {
1428         return &t.cl.mu
1429 }
1430
1431 func (t *Torrent) pieceHashed(piece int, correct bool) {
1432         if t.closed.IsSet() {
1433                 return
1434         }
1435         p := &t.pieces[piece]
1436         touchers := t.reapPieceTouchers(piece)
1437         if p.everHashed {
1438                 // Don't score the first time a piece is hashed, it could be an
1439                 // initial check.
1440                 if correct {
1441                         pieceHashedCorrect.Add(1)
1442                 } else {
1443                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1444                         pieceHashedNotCorrect.Add(1)
1445                 }
1446         }
1447         p.everHashed = true
1448         if correct {
1449                 for _, c := range touchers {
1450                         c.goodPiecesDirtied++
1451                 }
1452                 err := p.Storage().MarkComplete()
1453                 if err != nil {
1454                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1455                 }
1456                 t.updatePieceCompletion(piece)
1457         } else {
1458                 if len(touchers) != 0 {
1459                         for _, c := range touchers {
1460                                 // Y u do dis peer?!
1461                                 c.badPiecesDirtied++
1462                         }
1463                         slices.Sort(touchers, connLessTrusted)
1464                         log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1465                                 for _, c := range touchers {
1466                                         ret = append(ret, c.netGoodPiecesDirtied())
1467                                 }
1468                                 return
1469                         }())
1470                         c := touchers[0]
1471                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1472                         c.Drop()
1473                 }
1474                 t.onIncompletePiece(piece)
1475         }
1476 }
1477
1478 func (t *Torrent) cancelRequestsForPiece(piece int) {
1479         for cn := range t.conns {
1480                 cn.tickleWriter()
1481         }
1482 }
1483
1484 func (t *Torrent) onPieceCompleted(piece int) {
1485         t.pendingPieces.Remove(piece)
1486         t.pendAllChunkSpecs(piece)
1487         t.cancelRequestsForPiece(piece)
1488         for conn := range t.conns {
1489                 conn.Have(piece)
1490         }
1491 }
1492
1493 func (t *Torrent) onIncompletePiece(piece int) {
1494         if t.pieceAllDirty(piece) {
1495                 t.pendAllChunkSpecs(piece)
1496         }
1497         if !t.wantPieceIndex(piece) {
1498                 return
1499         }
1500         // We could drop any connections that we told we have a piece that we
1501         // don't here. But there's a test failure, and it seems clients don't care
1502         // if you request pieces that you already claim to have. Pruning bad
1503         // connections might just remove any connections that aren't treating us
1504         // favourably anyway.
1505
1506         // for c := range t.conns {
1507         //      if c.sentHave(piece) {
1508         //              c.Drop()
1509         //      }
1510         // }
1511         for conn := range t.conns {
1512                 if conn.PeerHasPiece(piece) {
1513                         conn.updateRequests()
1514                 }
1515         }
1516 }
1517
1518 func (t *Torrent) verifyPiece(piece int) {
1519         cl := t.cl
1520         cl.mu.Lock()
1521         defer cl.mu.Unlock()
1522         p := &t.pieces[piece]
1523         defer func() {
1524                 p.numVerifies++
1525                 cl.event.Broadcast()
1526         }()
1527         for p.hashing || t.storage == nil {
1528                 cl.event.Wait()
1529         }
1530         if !p.t.piecesQueuedForHash.Remove(piece) {
1531                 panic("piece was not queued")
1532         }
1533         if t.closed.IsSet() || t.pieceComplete(piece) {
1534                 t.updatePiecePriority(piece)
1535                 log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece))
1536                 return
1537         }
1538         p.hashing = true
1539         t.publishPieceChange(piece)
1540         cl.mu.Unlock()
1541         sum := t.hashPiece(piece)
1542         cl.mu.Lock()
1543         p.hashing = false
1544         t.pieceHashed(piece, sum == p.hash)
1545 }
1546
1547 // Return the connections that touched a piece, and clear the entry while
1548 // doing it.
1549 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1550         for c := range t.conns {
1551                 if _, ok := c.peerTouchedPieces[piece]; ok {
1552                         ret = append(ret, c)
1553                         delete(c.peerTouchedPieces, piece)
1554                 }
1555         }
1556         return
1557 }
1558
1559 func (t *Torrent) connsAsSlice() (ret []*connection) {
1560         for c := range t.conns {
1561                 ret = append(ret, c)
1562         }
1563         return
1564 }
1565
1566 // Currently doesn't really queue, but should in the future.
1567 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1568         piece := &t.pieces[pieceIndex]
1569         if piece.queuedForHash() {
1570                 return
1571         }
1572         t.piecesQueuedForHash.Add(pieceIndex)
1573         t.publishPieceChange(pieceIndex)
1574         go t.verifyPiece(pieceIndex)
1575 }
1576
1577 func (t *Torrent) VerifyData() {
1578         for i := range iter.N(t.NumPieces()) {
1579                 t.Piece(i).VerifyData()
1580         }
1581 }