]> Sergey Matveev's repositories - btrtrc.git/blob - torrent.go
Apply megacheck to torrent package
[btrtrc.git] / torrent.go
1 package torrent
2
3 import (
4         "container/heap"
5         "crypto/sha1"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math"
11         "math/rand"
12         "net"
13         "os"
14         "strconv"
15         "sync"
16         "text/tabwriter"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/missinggo"
21         "github.com/anacrolix/missinggo/bitmap"
22         "github.com/anacrolix/missinggo/perf"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/bradfitz/iter"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/metainfo"
29         pp "github.com/anacrolix/torrent/peer_protocol"
30         "github.com/anacrolix/torrent/storage"
31         "github.com/anacrolix/torrent/tracker"
32 )
33
34 func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
35         return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
36 }
37
38 type peersKey struct {
39         IPBytes string
40         Port    int
41 }
42
43 // Maintains state of torrent within a Client.
44 type Torrent struct {
45         cl *Client
46
47         networkingEnabled bool
48         requestStrategy   int
49
50         closed   missinggo.Event
51         infoHash metainfo.Hash
52         pieces   []Piece
53         // Values are the piece indices that changed.
54         pieceStateChanges *pubsub.PubSub
55         // The size of chunks to request from peers over the wire. This is
56         // normally 16KiB by convention these days.
57         chunkSize pp.Integer
58         chunkPool *sync.Pool
59         // Total length of the torrent in bytes. Stored because it's not O(1) to
60         // get this from the info dict.
61         length int64
62
63         // The storage to open when the info dict becomes available.
64         storageOpener *storage.Client
65         // Storage for torrent data.
66         storage *storage.Torrent
67
68         metainfo metainfo.MetaInfo
69
70         // The info dict. nil if we don't have it (yet).
71         info *metainfo.Info
72
73         // Active peer connections, running message stream loops.
74         conns               map[*connection]struct{}
75         maxEstablishedConns int
76         // Set of addrs to which we're attempting to connect. Connections are
77         // half-open until all handshakes are completed.
78         halfOpen    map[string]Peer
79         fastestConn *connection
80
81         // Reserve of peers to connect to. A peer can be both here and in the
82         // active connections if were told about the peer after connecting with
83         // them. That encourages us to reconnect to peers that are well known in
84         // the swarm.
85         peers          map[peersKey]Peer
86         wantPeersEvent missinggo.Event
87         // An announcer for each tracker URL.
88         trackerAnnouncers map[string]*trackerScraper
89         // How many times we've initiated a DHT announce. TODO: Move into stats.
90         numDHTAnnounces int
91
92         // Name used if the info name isn't available. Should be cleared when the
93         // Info does become available.
94         displayName string
95         // The bencoded bytes of the info dict. This is actively manipulated if
96         // the info bytes aren't initially available, and we try to fetch them
97         // from peers.
98         metadataBytes []byte
99         // Each element corresponds to the 16KiB metadata pieces. If true, we have
100         // received that piece.
101         metadataCompletedChunks []bool
102
103         // Set when .Info is obtained.
104         gotMetainfo missinggo.Event
105
106         readers               map[*Reader]struct{}
107         readerNowPieces       bitmap.Bitmap
108         readerReadaheadPieces bitmap.Bitmap
109
110         // The indexes of pieces we want with normal priority, that aren't
111         // currently available.
112         pendingPieces bitmap.Bitmap
113         // A cache of completed piece indices.
114         completedPieces bitmap.Bitmap
115         // Pieces that need to be hashed.
116         piecesQueuedForHash bitmap.Bitmap
117
118         // A pool of piece priorities []int for assignment to new connections.
119         // These "inclinations" are used to give connections preference for
120         // different pieces.
121         connPieceInclinationPool sync.Pool
122         // Torrent-level statistics.
123         stats TorrentStats
124 }
125
126 // Returns a channel that is closed when the Torrent is closed.
127 func (t *Torrent) Closed() <-chan struct{} {
128         return t.closed.LockedChan(&t.cl.mu)
129 }
130
131 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
132 // pending, and half-open peers.
133 func (t *Torrent) KnownSwarm() (ks []Peer) {
134         // Add pending peers to the list
135         for _, peer := range t.peers {
136                 ks = append(ks, peer)
137         }
138
139         // Add half-open peers to the list
140         for _, peer := range t.halfOpen {
141                 ks = append(ks, peer)
142         }
143
144         // Add active peers to the list
145         for conn := range t.conns {
146                 host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
147                 if err != nil {
148                         panic(err)
149                 }
150
151                 ip := net.ParseIP(host)
152                 port, err := strconv.Atoi(portString)
153                 if err != nil {
154                         panic(err)
155                 }
156
157                 ks = append(ks, Peer{
158                         Id:     conn.PeerID,
159                         IP:     ip,
160                         Port:   port,
161                         Source: conn.Discovery,
162                         // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
163                         // > But if we're not connected to them with an encrypted connection, I couldn't say
164                         // > what's appropriate. We can carry forward the SupportsEncryption value as we
165                         // > received it from trackers/DHT/PEX, or just use the encryption state for the
166                         // > connection. It's probably easiest to do the latter for now.
167                         // https://github.com/anacrolix/torrent/pull/188
168                         SupportsEncryption: conn.headerEncrypted,
169                 })
170         }
171
172         return
173 }
174
175 func (t *Torrent) setChunkSize(size pp.Integer) {
176         t.chunkSize = size
177         t.chunkPool = &sync.Pool{
178                 New: func() interface{} {
179                         b := make([]byte, size)
180                         return &b
181                 },
182         }
183 }
184
185 func (t *Torrent) setDisplayName(dn string) {
186         if t.haveInfo() {
187                 return
188         }
189         t.displayName = dn
190 }
191
192 func (t *Torrent) pieceComplete(piece int) bool {
193         return t.completedPieces.Get(piece)
194 }
195
196 func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
197         return t.pieces[piece].Storage().Completion()
198 }
199
200 // There's a connection to that address already.
201 func (t *Torrent) addrActive(addr string) bool {
202         if _, ok := t.halfOpen[addr]; ok {
203                 return true
204         }
205         for c := range t.conns {
206                 if c.remoteAddr().String() == addr {
207                         return true
208                 }
209         }
210         return false
211 }
212
213 func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
214         ret = make([]*connection, 0, len(t.conns))
215         for c := range t.conns {
216                 if !c.closed.IsSet() {
217                         ret = append(ret, c)
218                 }
219         }
220         return
221 }
222
223 func (t *Torrent) addPeer(p Peer) {
224         cl := t.cl
225         cl.openNewConns(t)
226         if len(t.peers) >= torrentPeersHighWater {
227                 return
228         }
229         key := peersKey{string(p.IP), p.Port}
230         if _, ok := t.peers[key]; ok {
231                 return
232         }
233         t.peers[key] = p
234         peersAddedBySource.Add(string(p.Source), 1)
235         cl.openNewConns(t)
236
237 }
238
239 func (t *Torrent) invalidateMetadata() {
240         for i := range t.metadataCompletedChunks {
241                 t.metadataCompletedChunks[i] = false
242         }
243         t.info = nil
244 }
245
246 func (t *Torrent) saveMetadataPiece(index int, data []byte) {
247         if t.haveInfo() {
248                 return
249         }
250         if index >= len(t.metadataCompletedChunks) {
251                 log.Printf("%s: ignoring metadata piece %d", t, index)
252                 return
253         }
254         copy(t.metadataBytes[(1<<14)*index:], data)
255         t.metadataCompletedChunks[index] = true
256 }
257
258 func (t *Torrent) metadataPieceCount() int {
259         return (len(t.metadataBytes) + (1 << 14) - 1) / (1 << 14)
260 }
261
262 func (t *Torrent) haveMetadataPiece(piece int) bool {
263         if t.haveInfo() {
264                 return (1<<14)*piece < len(t.metadataBytes)
265         } else {
266                 return piece < len(t.metadataCompletedChunks) && t.metadataCompletedChunks[piece]
267         }
268 }
269
270 func (t *Torrent) metadataSizeKnown() bool {
271         return t.metadataBytes != nil
272 }
273
274 func (t *Torrent) metadataSize() int {
275         return len(t.metadataBytes)
276 }
277
278 func infoPieceHashes(info *metainfo.Info) (ret []string) {
279         for i := 0; i < len(info.Pieces); i += sha1.Size {
280                 ret = append(ret, string(info.Pieces[i:i+sha1.Size]))
281         }
282         return
283 }
284
285 func (t *Torrent) makePieces() {
286         hashes := infoPieceHashes(t.info)
287         t.pieces = make([]Piece, len(hashes))
288         for i, hash := range hashes {
289                 piece := &t.pieces[i]
290                 piece.t = t
291                 piece.index = i
292                 piece.noPendingWrites.L = &piece.pendingWritesMutex
293                 missinggo.CopyExact(piece.hash[:], hash)
294         }
295 }
296
297 // Called when metadata for a torrent becomes available.
298 func (t *Torrent) setInfoBytes(b []byte) error {
299         if t.haveInfo() {
300                 return nil
301         }
302         if metainfo.HashBytes(b) != t.infoHash {
303                 return errors.New("info bytes have wrong hash")
304         }
305         var info metainfo.Info
306         err := bencode.Unmarshal(b, &info)
307         if err != nil {
308                 return fmt.Errorf("error unmarshalling info bytes: %s", err)
309         }
310         err = validateInfo(&info)
311         if err != nil {
312                 return fmt.Errorf("bad info: %s", err)
313         }
314         defer t.updateWantPeersEvent()
315         t.info = &info
316         t.displayName = "" // Save a few bytes lol.
317         t.cl.event.Broadcast()
318         t.gotMetainfo.Set()
319         t.storage, err = t.storageOpener.OpenTorrent(t.info, t.infoHash)
320         if err != nil {
321                 return fmt.Errorf("error opening torrent storage: %s", err)
322         }
323         t.length = 0
324         for _, f := range t.info.UpvertedFiles() {
325                 t.length += f.Length
326         }
327         t.metadataBytes = b
328         t.metadataCompletedChunks = nil
329         t.makePieces()
330         for conn := range t.conns {
331                 if err := conn.setNumPieces(t.numPieces()); err != nil {
332                         log.Printf("closing connection: %s", err)
333                         conn.Close()
334                 }
335         }
336         for i := range t.pieces {
337                 t.updatePieceCompletion(i)
338                 p := &t.pieces[i]
339                 if !p.storageCompletionOk {
340                         // log.Printf("piece %s completion unknown, queueing check", p)
341                         t.queuePieceCheck(i)
342                 }
343         }
344         return nil
345 }
346
347 func (t *Torrent) haveAllMetadataPieces() bool {
348         if t.haveInfo() {
349                 return true
350         }
351         if t.metadataCompletedChunks == nil {
352                 return false
353         }
354         for _, have := range t.metadataCompletedChunks {
355                 if !have {
356                         return false
357                 }
358         }
359         return true
360 }
361
362 // TODO: Propagate errors to disconnect peer.
363 func (t *Torrent) setMetadataSize(bytes int64) (err error) {
364         if t.haveInfo() {
365                 // We already know the correct metadata size.
366                 return
367         }
368         if bytes <= 0 || bytes > 10000000 { // 10MB, pulled from my ass.
369                 return errors.New("bad size")
370         }
371         if t.metadataBytes != nil && len(t.metadataBytes) == int(bytes) {
372                 return
373         }
374         t.metadataBytes = make([]byte, bytes)
375         t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
376         for c := range t.conns {
377                 c.requestPendingMetadata()
378         }
379         return
380 }
381
382 // The current working name for the torrent. Either the name in the info dict,
383 // or a display name given such as by the dn value in a magnet link, or "".
384 func (t *Torrent) name() string {
385         if t.haveInfo() {
386                 return t.info.Name
387         }
388         return t.displayName
389 }
390
391 func (t *Torrent) pieceState(index int) (ret PieceState) {
392         p := &t.pieces[index]
393         ret.Priority = t.piecePriority(index)
394         if t.pieceComplete(index) {
395                 ret.Complete = true
396         }
397         if p.queuedForHash() || p.hashing {
398                 ret.Checking = true
399         }
400         if !ret.Complete && t.piecePartiallyDownloaded(index) {
401                 ret.Partial = true
402         }
403         return
404 }
405
406 func (t *Torrent) metadataPieceSize(piece int) int {
407         return metadataPieceSize(len(t.metadataBytes), piece)
408 }
409
410 func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
411         d := map[string]int{
412                 "msg_type": msgType,
413                 "piece":    piece,
414         }
415         if data != nil {
416                 d["total_size"] = len(t.metadataBytes)
417         }
418         p, err := bencode.Marshal(d)
419         if err != nil {
420                 panic(err)
421         }
422         return pp.Message{
423                 Type:            pp.Extended,
424                 ExtendedID:      c.PeerExtensionIDs["ut_metadata"],
425                 ExtendedPayload: append(p, data...),
426         }
427 }
428
429 func (t *Torrent) pieceStateRuns() (ret []PieceStateRun) {
430         rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
431                 ret = append(ret, PieceStateRun{
432                         PieceState: el.(PieceState),
433                         Length:     int(count),
434                 })
435         })
436         for index := range t.pieces {
437                 rle.Append(t.pieceState(index), 1)
438         }
439         rle.Flush()
440         return
441 }
442
443 // Produces a small string representing a PieceStateRun.
444 func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
445         ret = fmt.Sprintf("%d", psr.Length)
446         ret += func() string {
447                 switch psr.Priority {
448                 case PiecePriorityNext:
449                         return "N"
450                 case PiecePriorityNormal:
451                         return "."
452                 case PiecePriorityReadahead:
453                         return "R"
454                 case PiecePriorityNow:
455                         return "!"
456                 default:
457                         return ""
458                 }
459         }()
460         if psr.Checking {
461                 ret += "H"
462         }
463         if psr.Partial {
464                 ret += "P"
465         }
466         if psr.Complete {
467                 ret += "C"
468         }
469         return
470 }
471
472 func (t *Torrent) writeStatus(w io.Writer) {
473         fmt.Fprintf(w, "Infohash: %s\n", t.infoHash.HexString())
474         fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
475         if !t.haveInfo() {
476                 fmt.Fprintf(w, "Metadata have: ")
477                 for _, h := range t.metadataCompletedChunks {
478                         fmt.Fprintf(w, "%c", func() rune {
479                                 if h {
480                                         return 'H'
481                                 } else {
482                                         return '.'
483                                 }
484                         }())
485                 }
486                 fmt.Fprintln(w)
487         }
488         fmt.Fprintf(w, "Piece length: %s\n", func() string {
489                 if t.haveInfo() {
490                         return fmt.Sprint(t.usualPieceSize())
491                 } else {
492                         return "?"
493                 }
494         }())
495         if t.info != nil {
496                 fmt.Fprintf(w, "Num Pieces: %d\n", t.numPieces())
497                 fmt.Fprint(w, "Piece States:")
498                 for _, psr := range t.pieceStateRuns() {
499                         w.Write([]byte(" "))
500                         w.Write([]byte(pieceStateRunStatusChars(psr)))
501                 }
502                 fmt.Fprintln(w)
503         }
504         fmt.Fprintf(w, "Reader Pieces:")
505         t.forReaderOffsetPieces(func(begin, end int) (again bool) {
506                 fmt.Fprintf(w, " %d:%d", begin, end)
507                 return true
508         })
509         fmt.Fprintln(w)
510
511         fmt.Fprintf(w, "Trackers:\n")
512         func() {
513                 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
514                 fmt.Fprintf(tw, "    URL\tNext announce\tLast announce\n")
515                 for _, ta := range slices.Sort(slices.FromMapElems(t.trackerAnnouncers), func(l, r *trackerScraper) bool {
516                         return l.url < r.url
517                 }).([]*trackerScraper) {
518                         fmt.Fprintf(tw, "    %s\n", ta.statusLine())
519                 }
520                 tw.Flush()
521         }()
522
523         fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces)
524
525         fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
526         fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
527         fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
528         conns := t.connsAsSlice()
529         slices.Sort(conns, worseConn)
530         for i, c := range conns {
531                 fmt.Fprintf(w, "%2d. ", i+1)
532                 c.WriteStatus(w, t)
533         }
534 }
535
536 func (t *Torrent) haveInfo() bool {
537         return t.info != nil
538 }
539
540 // Returns a run-time generated MetaInfo that includes the info bytes and
541 // announce-list as currently known to the client.
542 func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
543         return metainfo.MetaInfo{
544                 CreationDate: time.Now().Unix(),
545                 Comment:      "dynamic metainfo from client",
546                 CreatedBy:    "go.torrent",
547                 AnnounceList: t.metainfo.UpvertedAnnounceList(),
548                 InfoBytes:    t.metadataBytes,
549         }
550 }
551
552 func (t *Torrent) BytesMissing() int64 {
553         t.mu().RLock()
554         defer t.mu().RUnlock()
555         return t.bytesMissingLocked()
556 }
557
558 func (t *Torrent) bytesMissingLocked() int64 {
559         return t.bytesLeft()
560 }
561
562 func (t *Torrent) bytesLeft() (left int64) {
563         bitmap.Flip(t.completedPieces, 0, t.numPieces()).IterTyped(func(piece int) bool {
564                 p := t.pieces[piece]
565                 left += int64(p.length() - p.numDirtyBytes())
566                 return true
567         })
568         return
569 }
570
571 // Bytes left to give in tracker announces.
572 func (t *Torrent) bytesLeftAnnounce() uint64 {
573         if t.haveInfo() {
574                 return uint64(t.bytesLeft())
575         } else {
576                 return math.MaxUint64
577         }
578 }
579
580 func (t *Torrent) piecePartiallyDownloaded(piece int) bool {
581         if t.pieceComplete(piece) {
582                 return false
583         }
584         if t.pieceAllDirty(piece) {
585                 return false
586         }
587         return t.pieces[piece].hasDirtyChunks()
588 }
589
590 func (t *Torrent) usualPieceSize() int {
591         return int(t.info.PieceLength)
592 }
593
594 func (t *Torrent) numPieces() int {
595         return t.info.NumPieces()
596 }
597
598 func (t *Torrent) numPiecesCompleted() (num int) {
599         return t.completedPieces.Len()
600 }
601
602 func (t *Torrent) close() (err error) {
603         t.closed.Set()
604         if t.storage != nil {
605                 t.storage.Close()
606         }
607         for conn := range t.conns {
608                 conn.Close()
609         }
610         t.cl.event.Broadcast()
611         t.pieceStateChanges.Close()
612         t.updateWantPeersEvent()
613         return
614 }
615
616 func (t *Torrent) requestOffset(r request) int64 {
617         return torrentRequestOffset(t.length, int64(t.usualPieceSize()), r)
618 }
619
620 // Return the request that would include the given offset into the torrent
621 // data. Returns !ok if there is no such request.
622 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
623         return torrentOffsetRequest(t.length, t.info.PieceLength, int64(t.chunkSize), off)
624 }
625
626 func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
627         tr := perf.NewTimer()
628
629         n, err := t.pieces[piece].Storage().WriteAt(data, begin)
630         if err == nil && n != len(data) {
631                 err = io.ErrShortWrite
632         }
633         if err == nil {
634                 tr.Mark("write chunk")
635         }
636         return
637 }
638
639 func (t *Torrent) bitfield() (bf []bool) {
640         bf = make([]bool, t.numPieces())
641         t.completedPieces.IterTyped(func(piece int) (again bool) {
642                 bf[piece] = true
643                 return true
644         })
645         return
646 }
647
648 func (t *Torrent) pieceNumChunks(piece int) int {
649         return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
650 }
651
652 func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
653         t.pieces[pieceIndex].dirtyChunks.Clear()
654 }
655
656 type Peer struct {
657         Id     [20]byte
658         IP     net.IP
659         Port   int
660         Source peerSource
661         // Peer is known to support encryption.
662         SupportsEncryption bool
663 }
664
665 func (t *Torrent) pieceLength(piece int) pp.Integer {
666         if piece == t.numPieces()-1 {
667                 ret := pp.Integer(t.length % t.info.PieceLength)
668                 if ret != 0 {
669                         return ret
670                 }
671         }
672         return pp.Integer(t.info.PieceLength)
673 }
674
675 func (t *Torrent) hashPiece(piece int) (ret metainfo.Hash) {
676         hash := pieceHash.New()
677         p := &t.pieces[piece]
678         p.waitNoPendingWrites()
679         ip := t.info.Piece(piece)
680         pl := ip.Length()
681         n, err := io.Copy(hash, io.NewSectionReader(t.pieces[piece].Storage(), 0, pl))
682         if n == pl {
683                 missinggo.CopyExact(&ret, hash.Sum(nil))
684                 return
685         }
686         if err != io.ErrUnexpectedEOF && !os.IsNotExist(err) {
687                 log.Printf("unexpected error hashing piece with %T: %s", t.storage.TorrentImpl, err)
688         }
689         return
690 }
691
692 func (t *Torrent) haveAnyPieces() bool {
693         for i := range t.pieces {
694                 if t.pieceComplete(i) {
695                         return true
696                 }
697         }
698         return false
699 }
700
701 func (t *Torrent) havePiece(index int) bool {
702         return t.haveInfo() && t.pieceComplete(index)
703 }
704
705 func (t *Torrent) haveChunk(r request) (ret bool) {
706         // defer func() {
707         //      log.Println("have chunk", r, ret)
708         // }()
709         if !t.haveInfo() {
710                 return false
711         }
712         if t.pieceComplete(int(r.Index)) {
713                 return true
714         }
715         p := &t.pieces[r.Index]
716         return !p.pendingChunk(r.chunkSpec, t.chunkSize)
717 }
718
719 func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
720         return int(cs.Begin / chunkSize)
721 }
722
723 func (t *Torrent) wantPiece(r request) bool {
724         if !t.wantPieceIndex(int(r.Index)) {
725                 return false
726         }
727         if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
728                 return true
729         }
730         // TODO: What about pieces that were wanted, but aren't now, and aren't
731         // completed either? That used to be done here.
732         return false
733 }
734
735 func (t *Torrent) wantPieceIndex(index int) bool {
736         if !t.haveInfo() {
737                 return false
738         }
739         if index < 0 || index >= t.numPieces() {
740                 return false
741         }
742         p := &t.pieces[index]
743         if p.queuedForHash() {
744                 return false
745         }
746         if p.hashing {
747                 return false
748         }
749         if t.pieceComplete(index) {
750                 return false
751         }
752         if t.pendingPieces.Contains(index) {
753                 return true
754         }
755         return !t.forReaderOffsetPieces(func(begin, end int) bool {
756                 return index < begin || index >= end
757         })
758 }
759
760 // The worst connection is one that hasn't been sent, or sent anything useful
761 // for the longest. A bad connection is one that usually sends us unwanted
762 // pieces, or has been in worser half of the established connections for more
763 // than a minute.
764 func (t *Torrent) worstBadConn() *connection {
765         wcs := worseConnSlice{t.unclosedConnsAsSlice()}
766         heap.Init(&wcs)
767         for wcs.Len() != 0 {
768                 c := heap.Pop(&wcs).(*connection)
769                 if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
770                         return c
771                 }
772                 if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
773                         // Give connections 1 minute to prove themselves.
774                         if time.Since(c.completedHandshake) > time.Minute {
775                                 return c
776                         }
777                 }
778         }
779         return nil
780 }
781
782 type PieceStateChange struct {
783         Index int
784         PieceState
785 }
786
787 func (t *Torrent) publishPieceChange(piece int) {
788         cur := t.pieceState(piece)
789         p := &t.pieces[piece]
790         if cur != p.publicPieceState {
791                 p.publicPieceState = cur
792                 t.pieceStateChanges.Publish(PieceStateChange{
793                         piece,
794                         cur,
795                 })
796         }
797 }
798
799 func (t *Torrent) pieceNumPendingChunks(piece int) int {
800         if t.pieceComplete(piece) {
801                 return 0
802         }
803         return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
804 }
805
806 func (t *Torrent) pieceAllDirty(piece int) bool {
807         return t.pieces[piece].dirtyChunks.Len() == t.pieceNumChunks(piece)
808 }
809
810 func (t *Torrent) readersChanged() {
811         t.updateReaderPieces()
812         t.updateAllPiecePriorities()
813 }
814
815 func (t *Torrent) updateReaderPieces() {
816         t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities()
817 }
818
819 func (t *Torrent) readerPosChanged(from, to pieceRange) {
820         if from == to {
821                 return
822         }
823         t.updateReaderPieces()
824         // Order the ranges, high and low.
825         l, h := from, to
826         if l.begin > h.begin {
827                 l, h = h, l
828         }
829         if l.end < h.begin {
830                 // Two distinct ranges.
831                 t.updatePiecePriorities(l.begin, l.end)
832                 t.updatePiecePriorities(h.begin, h.end)
833         } else {
834                 // Ranges overlap.
835                 end := l.end
836                 if h.end > end {
837                         end = h.end
838                 }
839                 t.updatePiecePriorities(l.begin, end)
840         }
841 }
842
843 func (t *Torrent) maybeNewConns() {
844         // Tickle the accept routine.
845         t.cl.event.Broadcast()
846         t.openNewConns()
847 }
848
849 func (t *Torrent) piecePriorityChanged(piece int) {
850         for c := range t.conns {
851                 if c.updatePiecePriority(piece) {
852                         c.updateRequests()
853                 }
854         }
855         t.maybeNewConns()
856         t.publishPieceChange(piece)
857 }
858
859 func (t *Torrent) updatePiecePriority(piece int) {
860         p := &t.pieces[piece]
861         newPrio := t.piecePriorityUncached(piece)
862         if newPrio == p.priority {
863                 return
864         }
865         p.priority = newPrio
866         t.piecePriorityChanged(piece)
867 }
868
869 func (t *Torrent) updateAllPiecePriorities() {
870         t.updatePiecePriorities(0, len(t.pieces))
871 }
872
873 // Update all piece priorities in one hit. This function should have the same
874 // output as updatePiecePriority, but across all pieces.
875 func (t *Torrent) updatePiecePriorities(begin, end int) {
876         for i := begin; i < end; i++ {
877                 t.updatePiecePriority(i)
878         }
879 }
880
881 // Returns the range of pieces [begin, end) that contains the extent of bytes.
882 func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
883         if off >= t.length {
884                 return
885         }
886         if off < 0 {
887                 size += off
888                 off = 0
889         }
890         if size <= 0 {
891                 return
892         }
893         begin = int(off / t.info.PieceLength)
894         end = int((off + size + t.info.PieceLength - 1) / t.info.PieceLength)
895         if end > t.info.NumPieces() {
896                 end = t.info.NumPieces()
897         }
898         return
899 }
900
901 // Returns true if all iterations complete without breaking. Returns the read
902 // regions for all readers. The reader regions should not be merged as some
903 // callers depend on this method to enumerate readers.
904 func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
905         for r := range t.readers {
906                 p := r.pieces
907                 if p.begin >= p.end {
908                         continue
909                 }
910                 if !f(p.begin, p.end) {
911                         return false
912                 }
913         }
914         return true
915 }
916
917 func (t *Torrent) piecePriority(piece int) piecePriority {
918         if !t.haveInfo() {
919                 return PiecePriorityNone
920         }
921         return t.pieces[piece].priority
922 }
923
924 func (t *Torrent) piecePriorityUncached(piece int) piecePriority {
925         if t.pieceComplete(piece) {
926                 return PiecePriorityNone
927         }
928         if t.readerNowPieces.Contains(piece) {
929                 return PiecePriorityNow
930         }
931         // if t.readerNowPieces.Contains(piece - 1) {
932         //      return PiecePriorityNext
933         // }
934         if t.readerReadaheadPieces.Contains(piece) {
935                 return PiecePriorityReadahead
936         }
937         if t.pendingPieces.Contains(piece) {
938                 return PiecePriorityNormal
939         }
940         return PiecePriorityNone
941 }
942
943 func (t *Torrent) pendPiece(piece int) {
944         if t.pendingPieces.Contains(piece) {
945                 return
946         }
947         if t.havePiece(piece) {
948                 return
949         }
950         t.pendingPieces.Add(piece)
951         t.updatePiecePriority(piece)
952 }
953
954 func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
955         t.pendingPieces.Sub(unpend)
956         unpend.IterTyped(func(piece int) (again bool) {
957                 t.updatePiecePriority(piece)
958                 return true
959         })
960 }
961
962 func (t *Torrent) pendPieceRange(begin, end int) {
963         for i := begin; i < end; i++ {
964                 t.pendPiece(i)
965         }
966 }
967
968 func (t *Torrent) unpendPieceRange(begin, end int) {
969         var bm bitmap.Bitmap
970         bm.AddRange(begin, end)
971         t.unpendPieces(bm)
972 }
973
974 func (t *Torrent) pendRequest(req request) {
975         ci := chunkIndex(req.chunkSpec, t.chunkSize)
976         t.pieces[req.Index].pendChunkIndex(ci)
977 }
978
979 func (t *Torrent) pieceCompletionChanged(piece int) {
980         t.cl.event.Broadcast()
981         if t.pieceComplete(piece) {
982                 t.onPieceCompleted(piece)
983         } else {
984                 t.onIncompletePiece(piece)
985         }
986         t.updatePiecePriority(piece)
987 }
988
989 func (t *Torrent) openNewConns() {
990         t.cl.openNewConns(t)
991 }
992
993 func (t *Torrent) getConnPieceInclination() []int {
994         _ret := t.connPieceInclinationPool.Get()
995         if _ret == nil {
996                 pieceInclinationsNew.Add(1)
997                 return rand.Perm(t.numPieces())
998         }
999         pieceInclinationsReused.Add(1)
1000         return _ret.([]int)
1001 }
1002
1003 func (t *Torrent) putPieceInclination(pi []int) {
1004         t.connPieceInclinationPool.Put(&pi)
1005         pieceInclinationsPut.Add(1)
1006 }
1007
1008 func (t *Torrent) updatePieceCompletion(piece int) {
1009         pcu := t.pieceCompleteUncached(piece)
1010         p := &t.pieces[piece]
1011         changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
1012         p.storageCompletionOk = pcu.Ok
1013         t.completedPieces.Set(piece, pcu.Complete)
1014         if changed {
1015                 t.pieceCompletionChanged(piece)
1016         }
1017 }
1018
1019 // Non-blocking read. Client lock is not required.
1020 func (t *Torrent) readAt(b []byte, off int64) (n int, err error) {
1021         p := &t.pieces[off/t.info.PieceLength]
1022         p.waitNoPendingWrites()
1023         return p.Storage().ReadAt(b, off-p.Info().Offset())
1024 }
1025
1026 func (t *Torrent) updateAllPieceCompletions() {
1027         for i := range iter.N(t.numPieces()) {
1028                 t.updatePieceCompletion(i)
1029         }
1030 }
1031
1032 // Returns an error if the metadata was completed, but couldn't be set for
1033 // some reason. Blame it on the last peer to contribute.
1034 func (t *Torrent) maybeCompleteMetadata() error {
1035         if t.haveInfo() {
1036                 // Nothing to do.
1037                 return nil
1038         }
1039         if !t.haveAllMetadataPieces() {
1040                 // Don't have enough metadata pieces.
1041                 return nil
1042         }
1043         err := t.setInfoBytes(t.metadataBytes)
1044         if err != nil {
1045                 t.invalidateMetadata()
1046                 return fmt.Errorf("error setting info bytes: %s", err)
1047         }
1048         if t.cl.config.Debug {
1049                 log.Printf("%s: got metadata from peers", t)
1050         }
1051         return nil
1052 }
1053
1054 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1055         t.forReaderOffsetPieces(func(begin, end int) bool {
1056                 ret.AddRange(begin, end)
1057                 return true
1058         })
1059         return
1060 }
1061
1062 func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
1063         t.forReaderOffsetPieces(func(begin, end int) bool {
1064                 if end > begin {
1065                         now.Add(begin)
1066                         readahead.AddRange(begin+1, end)
1067                 }
1068                 return true
1069         })
1070         return
1071 }
1072
1073 func (t *Torrent) needData() bool {
1074         if t.closed.IsSet() {
1075                 return false
1076         }
1077         if !t.haveInfo() {
1078                 return true
1079         }
1080         if t.pendingPieces.Len() != 0 {
1081                 return true
1082         }
1083         // Read as "not all complete".
1084         return !t.readerPieces().IterTyped(func(piece int) bool {
1085                 return t.pieceComplete(piece)
1086         })
1087 }
1088
1089 func appendMissingStrings(old, new []string) (ret []string) {
1090         ret = old
1091 new:
1092         for _, n := range new {
1093                 for _, o := range old {
1094                         if o == n {
1095                                 continue new
1096                         }
1097                 }
1098                 ret = append(ret, n)
1099         }
1100         return
1101 }
1102
1103 func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
1104         ret = existing
1105         for minNumTiers > len(ret) {
1106                 ret = append(ret, nil)
1107         }
1108         return
1109 }
1110
1111 func (t *Torrent) addTrackers(announceList [][]string) {
1112         fullAnnounceList := &t.metainfo.AnnounceList
1113         t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
1114         for tierIndex, trackerURLs := range announceList {
1115                 (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
1116         }
1117         t.startMissingTrackerScrapers()
1118         t.updateWantPeersEvent()
1119 }
1120
1121 // Don't call this before the info is available.
1122 func (t *Torrent) bytesCompleted() int64 {
1123         if !t.haveInfo() {
1124                 return 0
1125         }
1126         return t.info.TotalLength() - t.bytesLeft()
1127 }
1128
1129 func (t *Torrent) SetInfoBytes(b []byte) (err error) {
1130         t.cl.mu.Lock()
1131         defer t.cl.mu.Unlock()
1132         return t.setInfoBytes(b)
1133 }
1134
1135 // Returns true if connection is removed from torrent.Conns.
1136 func (t *Torrent) deleteConnection(c *connection) (ret bool) {
1137         _, ret = t.conns[c]
1138         delete(t.conns, c)
1139         return
1140 }
1141
1142 func (t *Torrent) dropConnection(c *connection) {
1143         t.cl.event.Broadcast()
1144         c.Close()
1145         if t.deleteConnection(c) {
1146                 t.openNewConns()
1147         }
1148 }
1149
1150 func (t *Torrent) wantPeers() bool {
1151         if t.closed.IsSet() {
1152                 return false
1153         }
1154         if len(t.peers) > torrentPeersLowWater {
1155                 return false
1156         }
1157         return t.needData() || t.seeding()
1158 }
1159
1160 func (t *Torrent) updateWantPeersEvent() {
1161         if t.wantPeers() {
1162                 t.wantPeersEvent.Set()
1163         } else {
1164                 t.wantPeersEvent.Clear()
1165         }
1166 }
1167
1168 // Returns whether the client should make effort to seed the torrent.
1169 func (t *Torrent) seeding() bool {
1170         cl := t.cl
1171         if t.closed.IsSet() {
1172                 return false
1173         }
1174         if cl.config.NoUpload {
1175                 return false
1176         }
1177         if !cl.config.Seed {
1178                 return false
1179         }
1180         if cl.config.DisableAggressiveUpload && t.needData() {
1181                 return false
1182         }
1183         return true
1184 }
1185
1186 func (t *Torrent) startScrapingTracker(url string) {
1187         if url == "" {
1188                 return
1189         }
1190         if _, ok := t.trackerAnnouncers[url]; ok {
1191                 return
1192         }
1193         newAnnouncer := &trackerScraper{
1194                 url: url,
1195                 t:   t,
1196         }
1197         if t.trackerAnnouncers == nil {
1198                 t.trackerAnnouncers = make(map[string]*trackerScraper)
1199         }
1200         t.trackerAnnouncers[url] = newAnnouncer
1201         go newAnnouncer.Run()
1202 }
1203
1204 // Adds and starts tracker scrapers for tracker URLs that aren't already
1205 // running.
1206 func (t *Torrent) startMissingTrackerScrapers() {
1207         if t.cl.config.DisableTrackers {
1208                 return
1209         }
1210         t.startScrapingTracker(t.metainfo.Announce)
1211         for _, tier := range t.metainfo.AnnounceList {
1212                 for _, url := range tier {
1213                         t.startScrapingTracker(url)
1214                 }
1215         }
1216 }
1217
1218 // Returns an AnnounceRequest with fields filled out to defaults and current
1219 // values.
1220 func (t *Torrent) announceRequest() tracker.AnnounceRequest {
1221         return tracker.AnnounceRequest{
1222                 Event:    tracker.None,
1223                 NumWant:  -1,
1224                 Port:     uint16(t.cl.incomingPeerPort()),
1225                 PeerId:   t.cl.peerID,
1226                 InfoHash: t.infoHash,
1227                 Left:     t.bytesLeftAnnounce(),
1228         }
1229 }
1230
1231 // Adds peers revealed in an announce until the announce ends, or we have
1232 // enough peers.
1233 func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
1234         cl := t.cl
1235         // Count all the unique addresses we got during this announce.
1236         allAddrs := make(map[string]struct{})
1237         for {
1238                 select {
1239                 case v, ok := <-pvs:
1240                         if !ok {
1241                                 return
1242                         }
1243                         addPeers := make([]Peer, 0, len(v.Peers))
1244                         for _, cp := range v.Peers {
1245                                 if cp.Port == 0 {
1246                                         // Can't do anything with this.
1247                                         continue
1248                                 }
1249                                 addPeers = append(addPeers, Peer{
1250                                         IP:     cp.IP[:],
1251                                         Port:   cp.Port,
1252                                         Source: peerSourceDHTGetPeers,
1253                                 })
1254                                 key := (&net.UDPAddr{
1255                                         IP:   cp.IP[:],
1256                                         Port: cp.Port,
1257                                 }).String()
1258                                 allAddrs[key] = struct{}{}
1259                         }
1260                         cl.mu.Lock()
1261                         t.addPeers(addPeers)
1262                         numPeers := len(t.peers)
1263                         cl.mu.Unlock()
1264                         if numPeers >= torrentPeersHighWater {
1265                                 return
1266                         }
1267                 case <-t.closed.LockedChan(&cl.mu):
1268                         return
1269                 }
1270         }
1271 }
1272
1273 func (t *Torrent) announceDHT(impliedPort bool) (err error) {
1274         cl := t.cl
1275         ps, err := cl.dHT.Announce(t.infoHash, cl.incomingPeerPort(), impliedPort)
1276         if err != nil {
1277                 return
1278         }
1279         t.consumeDHTAnnounce(ps.Peers)
1280         ps.Close()
1281         return
1282 }
1283
1284 func (t *Torrent) dhtAnnouncer() {
1285         cl := t.cl
1286         for {
1287                 select {
1288                 case <-t.wantPeersEvent.LockedChan(&cl.mu):
1289                 case <-t.closed.LockedChan(&cl.mu):
1290                         return
1291                 }
1292                 err := t.announceDHT(true)
1293                 func() {
1294                         cl.mu.Lock()
1295                         defer cl.mu.Unlock()
1296                         if err == nil {
1297                                 t.numDHTAnnounces++
1298                         } else {
1299                                 log.Printf("error announcing %q to DHT: %s", t, err)
1300                         }
1301                 }()
1302                 select {
1303                 case <-t.closed.LockedChan(&cl.mu):
1304                         return
1305                 case <-time.After(5 * time.Minute):
1306                 }
1307         }
1308 }
1309
1310 func (t *Torrent) addPeers(peers []Peer) {
1311         for _, p := range peers {
1312                 if t.cl.badPeerIPPort(p.IP, p.Port) {
1313                         continue
1314                 }
1315                 t.addPeer(p)
1316         }
1317 }
1318
1319 func (t *Torrent) Stats() TorrentStats {
1320         t.cl.mu.Lock()
1321         defer t.cl.mu.Unlock()
1322
1323         t.stats.ActivePeers = len(t.conns)
1324         t.stats.HalfOpenPeers = len(t.halfOpen)
1325         t.stats.PendingPeers = len(t.peers)
1326         t.stats.TotalPeers = t.numTotalPeers()
1327
1328         return t.stats
1329 }
1330
1331 // The total number of peers in the torrent.
1332 func (t *Torrent) numTotalPeers() int {
1333         peers := make(map[string]struct{})
1334         for conn := range t.conns {
1335                 peers[conn.conn.RemoteAddr().String()] = struct{}{}
1336         }
1337         for addr := range t.halfOpen {
1338                 peers[addr] = struct{}{}
1339         }
1340         for _, peer := range t.peers {
1341                 peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
1342         }
1343         return len(peers)
1344 }
1345
1346 // Returns true if the connection is added.
1347 func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
1348         if t.cl.closed.IsSet() {
1349                 return false
1350         }
1351         if !t.wantConns() {
1352                 return false
1353         }
1354         for c0 := range t.conns {
1355                 if c.PeerID == c0.PeerID {
1356                         // Already connected to a client with that ID.
1357                         duplicateClientConns.Add(1)
1358                         lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
1359                         // Retain the connection from initiated from lower peer ID to
1360                         // higher.
1361                         if outgoing == lower {
1362                                 // Close the other one.
1363                                 c0.Close()
1364                                 // Is it safe to delete from the map while we're iterating
1365                                 // over it?
1366                                 t.deleteConnection(c0)
1367                         } else {
1368                                 // Abandon this one.
1369                                 return false
1370                         }
1371                 }
1372         }
1373         if len(t.conns) >= t.maxEstablishedConns {
1374                 c := t.worstBadConn()
1375                 if c == nil {
1376                         return false
1377                 }
1378                 if t.cl.config.Debug && missinggo.CryHeard() {
1379                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1380                 }
1381                 c.Close()
1382                 t.deleteConnection(c)
1383         }
1384         if len(t.conns) >= t.maxEstablishedConns {
1385                 panic(len(t.conns))
1386         }
1387         if c.t != nil {
1388                 panic("connection already associated with a torrent")
1389         }
1390         // Reconcile bytes transferred before connection was associated with a
1391         // torrent.
1392         t.stats.wroteBytes(c.stats.BytesWritten)
1393         t.stats.readBytes(c.stats.BytesRead)
1394         c.t = t
1395         t.conns[c] = struct{}{}
1396         return true
1397 }
1398
1399 func (t *Torrent) wantConns() bool {
1400         if !t.networkingEnabled {
1401                 return false
1402         }
1403         if t.closed.IsSet() {
1404                 return false
1405         }
1406         if !t.seeding() && !t.needData() {
1407                 return false
1408         }
1409         if len(t.conns) < t.maxEstablishedConns {
1410                 return true
1411         }
1412         return t.worstBadConn() != nil
1413 }
1414
1415 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
1416         t.cl.mu.Lock()
1417         defer t.cl.mu.Unlock()
1418         oldMax = t.maxEstablishedConns
1419         t.maxEstablishedConns = max
1420         wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
1421         for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
1422                 t.dropConnection(wcs.Pop().(*connection))
1423         }
1424         t.openNewConns()
1425         return oldMax
1426 }
1427
1428 func (t *Torrent) mu() missinggo.RWLocker {
1429         return &t.cl.mu
1430 }
1431
1432 func (t *Torrent) pieceHashed(piece int, correct bool) {
1433         if t.closed.IsSet() {
1434                 return
1435         }
1436         p := &t.pieces[piece]
1437         touchers := t.reapPieceTouchers(piece)
1438         if p.everHashed {
1439                 // Don't score the first time a piece is hashed, it could be an
1440                 // initial check.
1441                 if correct {
1442                         pieceHashedCorrect.Add(1)
1443                 } else {
1444                         log.Printf("%s: piece %d (%s) failed hash: %d connections contributed", t, piece, p.hash, len(touchers))
1445                         pieceHashedNotCorrect.Add(1)
1446                 }
1447         }
1448         p.everHashed = true
1449         if correct {
1450                 for _, c := range touchers {
1451                         c.goodPiecesDirtied++
1452                 }
1453                 err := p.Storage().MarkComplete()
1454                 if err != nil {
1455                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1456                 }
1457                 t.updatePieceCompletion(piece)
1458         } else {
1459                 if len(touchers) != 0 {
1460                         for _, c := range touchers {
1461                                 // Y u do dis peer?!
1462                                 c.badPiecesDirtied++
1463                         }
1464                         slices.Sort(touchers, connLessTrusted)
1465                         log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
1466                                 for _, c := range touchers {
1467                                         ret = append(ret, c.netGoodPiecesDirtied())
1468                                 }
1469                                 return
1470                         }())
1471                         c := touchers[0]
1472                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1473                         c.Drop()
1474                 }
1475                 t.onIncompletePiece(piece)
1476         }
1477 }
1478
1479 func (t *Torrent) cancelRequestsForPiece(piece int) {
1480         // TODO: Make faster
1481         for cn := range t.conns {
1482                 cn.tickleWriter()
1483         }
1484 }
1485
1486 func (t *Torrent) onPieceCompleted(piece int) {
1487         t.pendingPieces.Remove(piece)
1488         t.pendAllChunkSpecs(piece)
1489         t.cancelRequestsForPiece(piece)
1490         for conn := range t.conns {
1491                 conn.Have(piece)
1492         }
1493 }
1494
1495 func (t *Torrent) onIncompletePiece(piece int) {
1496         if t.pieceAllDirty(piece) {
1497                 t.pendAllChunkSpecs(piece)
1498         }
1499         if !t.wantPieceIndex(piece) {
1500                 return
1501         }
1502         // We could drop any connections that we told we have a piece that we
1503         // don't here. But there's a test failure, and it seems clients don't care
1504         // if you request pieces that you already claim to have. Pruning bad
1505         // connections might just remove any connections that aren't treating us
1506         // favourably anyway.
1507
1508         // for c := range t.conns {
1509         //      if c.sentHave(piece) {
1510         //              c.Drop()
1511         //      }
1512         // }
1513         for conn := range t.conns {
1514                 if conn.PeerHasPiece(piece) {
1515                         conn.updateRequests()
1516                 }
1517         }
1518 }
1519
1520 func (t *Torrent) verifyPiece(piece int) {
1521         cl := t.cl
1522         cl.mu.Lock()
1523         defer cl.mu.Unlock()
1524         p := &t.pieces[piece]
1525         defer func() {
1526                 p.numVerifies++
1527                 cl.event.Broadcast()
1528         }()
1529         for p.hashing || t.storage == nil {
1530                 cl.event.Wait()
1531         }
1532         if !p.t.piecesQueuedForHash.Remove(piece) {
1533                 panic("piece was not queued")
1534         }
1535         if t.closed.IsSet() || t.pieceComplete(piece) {
1536                 t.updatePiecePriority(piece)
1537                 return
1538         }
1539         p.hashing = true
1540         t.publishPieceChange(piece)
1541         cl.mu.Unlock()
1542         sum := t.hashPiece(piece)
1543         cl.mu.Lock()
1544         p.hashing = false
1545         t.pieceHashed(piece, sum == p.hash)
1546 }
1547
1548 // Return the connections that touched a piece, and clear the entry while
1549 // doing it.
1550 func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
1551         for c := range t.conns {
1552                 if _, ok := c.peerTouchedPieces[piece]; ok {
1553                         ret = append(ret, c)
1554                         delete(c.peerTouchedPieces, piece)
1555                 }
1556         }
1557         return
1558 }
1559
1560 func (t *Torrent) connsAsSlice() (ret []*connection) {
1561         for c := range t.conns {
1562                 ret = append(ret, c)
1563         }
1564         return
1565 }
1566
1567 // Currently doesn't really queue, but should in the future.
1568 func (t *Torrent) queuePieceCheck(pieceIndex int) {
1569         piece := &t.pieces[pieceIndex]
1570         if piece.queuedForHash() {
1571                 return
1572         }
1573         t.piecesQueuedForHash.Add(pieceIndex)
1574         t.publishPieceChange(pieceIndex)
1575         go t.verifyPiece(pieceIndex)
1576 }
1577
1578 func (t *Torrent) VerifyData() {
1579         for i := range iter.N(t.NumPieces()) {
1580                 t.Piece(i).VerifyData()
1581         }
1582 }